File: src\libraries\System.Private.CoreLib\src\System\Collections\Concurrent\ConcurrentQueueSegment.cs
Web Access
Project: src\src\coreclr\System.Private.CoreLib\System.Private.CoreLib.csproj (System.Private.CoreLib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
 
namespace System.Collections.Concurrent
{
    /// <summary>
    /// Provides a multi-producer, multi-consumer thread-safe bounded segment.  When the queue is full,
    /// enqueues fail and return false.  When the queue is empty, dequeues fail and return null.
    /// These segments are linked together to form the unbounded <see cref="ConcurrentQueue{T}"/>.
    /// </summary>
    [DebuggerDisplay("Capacity = {Capacity}")]
    internal sealed class ConcurrentQueueSegment<T>
    {
        // Segment design is inspired by the algorithm outlined at:
        // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 
        /// <summary>The array of items in this queue.  Each slot contains the item in that slot and its "sequence number".</summary>
        internal readonly Slot[] _slots; // SOS's ThreadPool command depends on this name
        /// <summary>Mask for quickly accessing a position within the queue's array.</summary>
        internal readonly int _slotsMask;
        /// <summary>The head and tail positions, with padding to help avoid false sharing contention.</summary>
        /// <remarks>Dequeuing happens from the head, enqueuing happens at the tail.</remarks>
        internal PaddedHeadAndTail _headAndTail; // mutable struct: do not make this readonly
 
        /// <summary>Indicates whether the segment has been marked such that dequeues don't overwrite the removed data.</summary>
        internal bool _preservedForObservation;
        /// <summary>Indicates whether the segment has been marked such that no additional items may be enqueued.</summary>
        internal bool _frozenForEnqueues;
#pragma warning disable 0649 // some builds don't assign to this field
        /// <summary>The segment following this one in the queue, or null if this segment is the last in the queue.</summary>
        internal ConcurrentQueueSegment<T>? _nextSegment; // SOS's ThreadPool command depends on this name
#pragma warning restore 0649
 
        /// <summary>Creates the segment.</summary>
        /// <param name="boundedLength">
        /// The maximum number of elements the segment can contain.  Must be a power of 2.
        /// </param>
        internal ConcurrentQueueSegment(int boundedLength)
        {
            // Validate the length
            Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
            Debug.Assert(BitOperations.IsPow2(boundedLength), $"Must be a power of 2, got {boundedLength}");
 
            // Initialize the slots and the mask.  The mask is used as a way of quickly doing "% _slots.Length",
            // instead letting us do "& _slotsMask".
            _slots = new Slot[boundedLength];
            _slotsMask = boundedLength - 1;
 
            // Initialize the sequence number for each slot.  The sequence number provides a ticket that
            // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can
            // enqueue.  An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer
            // for position N can dequeue when the sequence number is N + 1.  When an enqueuer is done writing
            // at position N, it sets the sequence number to N + 1 so that a dequeuer will be able to dequeue,
            // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length,
            // so that when an enqueuer loops around the slots, it'll find that the sequence number at
            // position N is N.  This also means that when an enqueuer finds that at position N the sequence
            // number is < N, there is still a value in that slot, i.e. the segment is full, and when a
            // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to
            // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into
            // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc.
            // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will
            // return false.)
            for (int i = 0; i < _slots.Length; i++)
            {
                _slots[i].SequenceNumber = i;
            }
        }
 
        /// <summary>Gets the number of elements this segment can store.</summary>
        internal int Capacity => _slots.Length;
 
        /// <summary>Gets the "freeze offset" for this segment.</summary>
        internal int FreezeOffset => _slots.Length * 2;
 
        /// <summary>
        /// Ensures that the segment will not accept any subsequent enqueues that aren't already underway.
        /// </summary>
        /// <remarks>
        /// When we mark a segment as being frozen for additional enqueues,
        /// we set the <see cref="_frozenForEnqueues"/> bool, but that's mostly
        /// as a small helper to avoid marking it twice.  The real marking comes
        /// by modifying the Tail for the segment, increasing it by this
        /// <see cref="FreezeOffset"/>.  This effectively knocks it off the
        /// sequence expected by future enqueuers, such that any additional enqueuer
        /// will be unable to enqueue due to it not lining up with the expected
        /// sequence numbers.  This value is chosen specially so that Tail will grow
        /// to a value that maps to the same slot but that won't be confused with
        /// any other enqueue/dequeue sequence number.
        /// </remarks>
        internal void EnsureFrozenForEnqueues() // must only be called while queue's segment lock is held
        {
            if (!_frozenForEnqueues) // flag used to ensure we don't increase the Tail more than once if frozen more than once
            {
                _frozenForEnqueues = true;
                Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);
            }
        }
 
        /// <summary>Tries to dequeue an element from the queue.</summary>
        public bool TryDequeue([MaybeNullWhen(false)] out T item)
        {
            Slot[] slots = _slots;
 
            // Loop in case of contention...
            SpinWait spinner = default;
            while (true)
            {
                // Get the head at which to try to dequeue.
                int currentHead = Volatile.Read(ref _headAndTail.Head);
                int slotsIndex = currentHead & _slotsMask;
 
                // Read the sequence number for the head position.
                int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
 
                // We can dequeue from this slot if it's been filled by an enqueuer, which
                // would have left the sequence number at pos+1.
                int diff = sequenceNumber - (currentHead + 1);
                if (diff == 0)
                {
                    // We may be racing with other dequeuers.  Try to reserve the slot by incrementing
                    // the head.  Once we've done that, no one else will be able to read from this slot,
                    // and no enqueuer will be able to read from this slot until we've written the new
                    // sequence number. WARNING: The next few lines are not reliable on a runtime that
                    // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
                    // but before the Volatile.Write, enqueuers trying to enqueue into this slot would
                    // spin indefinitely.  If this implementation is ever used on such a platform, this
                    // if block should be wrapped in a finally / prepared region.
                    if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
                    {
                        // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
                        // trying to dequeue from this slot will end up spinning until we do the subsequent Write.
                        item = slots[slotsIndex].Item!;
                        if (!Volatile.Read(ref _preservedForObservation))
                        {
                            // If we're preserving, though, we don't zero out the slot, as we need it for
                            // enumerations, peeking, ToArray, etc.  And we don't update the sequence number,
                            // so that an enqueuer will see it as full and be forced to move to a new segment.
                            if (RuntimeHelpers.IsReferenceOrContainsReferences<T>())
                            {
                                slots[slotsIndex].Item = default;
                            }
                            Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);
                        }
                        return true;
                    }
 
                    // The head was already advanced by another thread. A newer head has already been observed and the next
                    // iteration would make forward progress, so there's no need to spin-wait before trying again.
                }
                else if (diff < 0)
                {
                    // The sequence number was less than what we needed, which means this slot doesn't
                    // yet contain a value we can dequeue, i.e. the segment is empty.  Technically it's
                    // possible that multiple enqueuers could have written concurrently, with those
                    // getting later slots actually finishing first, so there could be elements after
                    // this one that are available, but we need to dequeue in order.  So before declaring
                    // failure and that the segment is empty, we check the tail to see if we're actually
                    // empty or if we're just waiting for items in flight or after this one to become available.
                    bool frozen = _frozenForEnqueues;
                    int currentTail = Volatile.Read(ref _headAndTail.Tail);
                    if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
                    {
                        item = default;
                        return false;
                    }
 
                    // It's possible it could have become frozen after we checked _frozenForEnqueues
                    // and before reading the tail.  That's ok: in that rare race condition, we just
                    // loop around again. This is not necessarily an always-forward-progressing
                    // situation since this thread is waiting for another to write to the slot and
                    // this thread may have to check the same slot multiple times. Spin-wait to avoid
                    // a potential busy-wait, and then try again.
                    spinner.SpinOnce(sleep1Threshold: -1);
                }
                else
                {
                    // The item was already dequeued by another thread. The head has already been updated beyond what was
                    // observed above, and the sequence number observed above as a volatile load is more recent than the update
                    // to the head. So, the next iteration of the loop is guaranteed to see a new head. Since this is an
                    // always-forward-progressing situation, there's no need to spin-wait before trying again.
                }
            }
        }
 
        /// <summary>Tries to peek at an element from the queue, without removing it.</summary>
        public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
        {
            if (resultUsed)
            {
                // In order to ensure we don't get a torn read on the value, we mark the segment
                // as preserving for observation.  Additional items can still be enqueued to this
                // segment, but no space will be freed during dequeues, such that the segment will
                // no longer be reusable.
                _preservedForObservation = true;
                Interlocked.MemoryBarrier();
            }
 
            Slot[] slots = _slots;
 
            // Loop in case of contention...
            SpinWait spinner = default;
            while (true)
            {
                // Get the head at which to try to peek.
                int currentHead = Volatile.Read(ref _headAndTail.Head);
                int slotsIndex = currentHead & _slotsMask;
 
                // Read the sequence number for the head position.
                int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
 
                // We can peek from this slot if it's been filled by an enqueuer, which
                // would have left the sequence number at pos+1.
                int diff = sequenceNumber - (currentHead + 1);
                if (diff == 0)
                {
                    result = resultUsed ? slots[slotsIndex].Item! : default!;
                    return true;
                }
                else if (diff < 0)
                {
                    // The sequence number was less than what we needed, which means this slot doesn't
                    // yet contain a value we can peek, i.e. the segment is empty.  Technically it's
                    // possible that multiple enqueuers could have written concurrently, with those
                    // getting later slots actually finishing first, so there could be elements after
                    // this one that are available, but we need to peek in order.  So before declaring
                    // failure and that the segment is empty, we check the tail to see if we're actually
                    // empty or if we're just waiting for items in flight or after this one to become available.
                    bool frozen = _frozenForEnqueues;
                    int currentTail = Volatile.Read(ref _headAndTail.Tail);
                    if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
                    {
                        result = default;
                        return false;
                    }
 
                    // It's possible it could have become frozen after we checked _frozenForEnqueues
                    // and before reading the tail.  That's ok: in that rare race condition, we just
                    // loop around again. This is not necessarily an always-forward-progressing
                    // situation since this thread is waiting for another to write to the slot and
                    // this thread may have to check the same slot multiple times. Spin-wait to avoid
                    // a potential busy-wait, and then try again.
                    spinner.SpinOnce(sleep1Threshold: -1);
                }
                else
                {
                    // The item was already dequeued by another thread. The head has already been updated beyond what was
                    // observed above, and the sequence number observed above as a volatile load is more recent than the update
                    // to the head. So, the next iteration of the loop is guaranteed to see a new head. Since this is an
                    // always-forward-progressing situation, there's no need to spin-wait before trying again.
                }
            }
        }
 
        /// <summary>
        /// Attempts to enqueue the item.  If successful, the item will be stored
        /// in the queue and true will be returned; otherwise, the item won't be stored, and false
        /// will be returned.
        /// </summary>
        public bool TryEnqueue(T item)
        {
            Slot[] slots = _slots;
 
            // Loop in case of contention...
            while (true)
            {
                // Get the tail at which to try to return.
                int currentTail = Volatile.Read(ref _headAndTail.Tail);
                int slotsIndex = currentTail & _slotsMask;
 
                // Read the sequence number for the tail position.
                int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
 
                // The slot is empty and ready for us to enqueue into it if its sequence
                // number matches the slot.
                int diff = sequenceNumber - currentTail;
                if (diff == 0)
                {
                    // We may be racing with other enqueuers.  Try to reserve the slot by incrementing
                    // the tail.  Once we've done that, no one else will be able to write to this slot,
                    // and no dequeuer will be able to read from this slot until we've written the new
                    // sequence number. WARNING: The next few lines are not reliable on a runtime that
                    // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
                    // but before the Volatile.Write, other threads will spin trying to access this slot.
                    // If this implementation is ever used on such a platform, this if block should be
                    // wrapped in a finally / prepared region.
                    if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
                    {
                        // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
                        // trying to return will end up spinning until we do the subsequent Write.
                        slots[slotsIndex].Item = item;
                        Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);
                        return true;
                    }
 
                    // The tail was already advanced by another thread. A newer tail has already been observed and the next
                    // iteration would make forward progress, so there's no need to spin-wait before trying again.
                }
                else if (diff < 0)
                {
                    // The sequence number was less than what we needed, which means this slot still
                    // contains a value, i.e. the segment is full.  Technically it's possible that multiple
                    // dequeuers could have read concurrently, with those getting later slots actually
                    // finishing first, so there could be spaces after this one that are available, but
                    // we need to enqueue in order.
                    return false;
                }
                else
                {
                    // Either the slot contains an item, or it is empty but because the slot was filled and dequeued. In either
                    // case, the tail has already been updated beyond what was observed above, and the sequence number observed
                    // above as a volatile load is more recent than the update to the tail. So, the next iteration of the loop
                    // is guaranteed to see a new tail. Since this is an always-forward-progressing situation, there's no need
                    // to spin-wait before trying again.
                }
            }
        }
 
        /// <summary>Represents a slot in the queue.</summary>
        [StructLayout(LayoutKind.Auto)]
        [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")]
        internal struct Slot
        {
            /// <summary>The item.</summary>
            public T? Item; // SOS's ThreadPool command depends on this being at the beginning of the struct when T is a reference type
            /// <summary>The sequence number for this slot, used to synchronize between enqueuers and dequeuers.</summary>
            public int SequenceNumber;
        }
    }
 
    /// <summary>Padded head and tail indices, to avoid false sharing between producers and consumers.</summary>
    [DebuggerDisplay("Head = {Head}, Tail = {Tail}")]
    [StructLayout(LayoutKind.Explicit, Size = 3 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] // padding before/between/after fields
    internal struct PaddedHeadAndTail
    {
        [FieldOffset(1 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] public int Head;
        [FieldOffset(2 * Internal.PaddingHelpers.CACHE_LINE_SIZE)] public int Tail;
    }
}