File: System\Collections\Concurrent\ConcurrentBag.cs
Web Access
Project: src\src\libraries\System.Collections.Concurrent\src\System.Collections.Concurrent.csproj (System.Collections.Concurrent)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
 
namespace System.Collections.Concurrent
{
    /// <summary>
    /// Represents a thread-safe, unordered collection of objects.
    /// </summary>
    /// <typeparam name="T">Specifies the type of elements in the bag.</typeparam>
    /// <remarks>
    /// <para>
    /// Bags are useful for storing objects when ordering doesn't matter, and unlike sets, bags support
    /// duplicates. <see cref="ConcurrentBag{T}"/> is a thread-safe bag implementation, optimized for
    /// scenarios where the same thread will be both producing and consuming data stored in the bag.
    /// </para>
    /// <para>
    /// <see cref="ConcurrentBag{T}"/> accepts null reference (Nothing in Visual Basic) as a valid
    /// value for reference types.
    /// </para>
    /// <para>
    /// All public and protected members of <see cref="ConcurrentBag{T}"/> are thread-safe and may be used
    /// concurrently from multiple threads.
    /// </para>
    /// </remarks>
    [DebuggerTypeProxy(typeof(IProducerConsumerCollectionDebugView<>))]
    [DebuggerDisplay("Count = {Count}")]
    public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        /// <summary>The per-bag, per-thread work-stealing queues.</summary>
        private readonly ThreadLocal<WorkStealingQueue> _locals;
        /// <summary>The head work stealing queue in a linked list of queues.</summary>
        private volatile WorkStealingQueue? _workStealingQueues;
        /// <summary>Number of times any list transitions from empty to non-empty.</summary>
        private long _emptyToNonEmptyListTransitionCount;
 
        /// <summary>Initializes a new instance of the <see cref="ConcurrentBag{T}"/> class.</summary>
        public ConcurrentBag()
        {
            _locals = new ThreadLocal<WorkStealingQueue>();
        }
 
        /// <summary>
        /// Initializes a new instance of the <see cref="ConcurrentBag{T}"/>
        /// class that contains elements copied from the specified collection.
        /// </summary>
        /// <param name="collection">The collection whose elements are copied to the new <see
        /// cref="ConcurrentBag{T}"/>.</param>
        /// <exception cref="ArgumentNullException"><paramref name="collection"/> is a null reference
        /// (Nothing in Visual Basic).</exception>
        public ConcurrentBag(IEnumerable<T> collection)
        {
            ArgumentNullException.ThrowIfNull(collection);
 
            _locals = new ThreadLocal<WorkStealingQueue>();
 
            WorkStealingQueue queue = GetCurrentThreadWorkStealingQueue(forceCreate: true)!;
            foreach (T item in collection)
            {
                queue.LocalPush(item, ref _emptyToNonEmptyListTransitionCount);
            }
        }
 
        /// <summary>
        /// Adds an object to the <see cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <param name="item">The object to be added to the
        /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
        /// (Nothing in Visual Basic) for reference types.</param>
        public void Add(T item) =>
            GetCurrentThreadWorkStealingQueue(forceCreate: true)!
            .LocalPush(item, ref _emptyToNonEmptyListTransitionCount);
 
        /// <summary>
        /// Attempts to add an object to the <see cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <param name="item">The object to be added to the
        /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
        /// (Nothing in Visual Basic) for reference types.</param>
        /// <returns>Always returns true</returns>
        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            Add(item);
            return true;
        }
 
        /// <summary>
        /// Attempts to remove and return an object from the <see cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <param name="result">When this method returns, <paramref name="result"/> contains the object
        /// removed from the <see cref="ConcurrentBag{T}"/> or the default value
        /// of <typeparamref name="T"/> if the operation failed.</param>
        /// <returns>true if an object was removed successfully; otherwise, false.</returns>
        public bool TryTake([MaybeNullWhen(false)] out T result)
        {
            WorkStealingQueue? queue = GetCurrentThreadWorkStealingQueue(forceCreate: false);
            return (queue != null && queue.TryLocalPop(out result)) || TrySteal(out result, take: true);
        }
 
        /// <summary>
        /// Attempts to return an object from the <see cref="ConcurrentBag{T}"/> without removing it.
        /// </summary>
        /// <param name="result">When this method returns, <paramref name="result"/> contains an object from
        /// the <see cref="ConcurrentBag{T}"/> or the default value of
        /// <typeparamref name="T"/> if the operation failed.</param>
        /// <returns>true if and object was returned successfully; otherwise, false.</returns>
        public bool TryPeek([MaybeNullWhen(false)] out T result)
        {
            WorkStealingQueue? queue = GetCurrentThreadWorkStealingQueue(forceCreate: false);
            return (queue != null && queue.TryLocalPeek(out result)) || TrySteal(out result, take: false);
        }
 
        /// <summary>Gets the work-stealing queue data structure for the current thread.</summary>
        /// <param name="forceCreate">Whether to create a new queue if this thread doesn't have one.</param>
        /// <returns>The local queue object, or null if the thread doesn't have one.</returns>
        private WorkStealingQueue? GetCurrentThreadWorkStealingQueue(bool forceCreate) =>
            _locals.Value ??
            (forceCreate ? CreateWorkStealingQueueForCurrentThread() : null);
 
        private WorkStealingQueue CreateWorkStealingQueueForCurrentThread()
        {
            lock (GlobalQueuesLock) // necessary to update _workStealingQueues, so as to synchronize with freezing operations
            {
                WorkStealingQueue? head = _workStealingQueues;
 
                WorkStealingQueue? queue = head != null ? GetUnownedWorkStealingQueue() : null;
                if (queue == null)
                {
                    _workStealingQueues = queue = new WorkStealingQueue(head);
                }
                _locals.Value = queue;
 
                return queue;
            }
        }
 
        /// <summary>
        /// Try to reuse an unowned queue.  If a thread interacts with the bag and then exits,
        /// the bag purposefully retains its queue, as it contains data associated with the bag.
        /// </summary>
        /// <returns>The queue object, or null if no unowned queue could be gathered.</returns>
        private WorkStealingQueue? GetUnownedWorkStealingQueue()
        {
            Debug.Assert(Monitor.IsEntered(GlobalQueuesLock));
 
            // Look for a thread that has the same ID as this one.  It won't have come from the same thread,
            // but if our thread ID is reused, we know that no other thread can have the same ID and thus
            // no other thread can be using this queue.
            int currentThreadId = Environment.CurrentManagedThreadId;
            for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
            {
                if (queue._ownerThreadId == currentThreadId)
                {
                    return queue;
                }
            }
 
            return null;
        }
 
        /// <summary>Local helper method to steal an item from any other non empty thread.</summary>
        /// <param name="result">To receive the item retrieved from the bag</param>
        /// <param name="take">Whether to remove or peek.</param>
        /// <returns>True if succeeded, false otherwise.</returns>
        private bool TrySteal([MaybeNullWhen(false)] out T result, bool take)
        {
            if (CDSCollectionETWBCLProvider.Log.IsEnabled())
            {
                if (take)
                {
                    CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryTakeSteals();
                }
                else
                {
                    CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryPeekSteals();
                }
            }
 
            while (true)
            {
                // We need to track whether any lists transition from empty to non-empty both before
                // and after we attempt the steal in case we don't get an item:
                //
                // If we don't get an item, we need to handle the possibility of a race condition that led to
                // an item being added to a list after we already looked at it in a way that breaks
                // linearizability.  For example, say there are three threads 0, 1, and 2, each with their own
                // list that's currently empty.  We could then have the following series of operations:
                // - Thread 2 adds an item, such that there's now 1 item in the bag.
                // - Thread 1 sees that the count is 1 and does a Take. Its local list is empty, so it tries to
                //   steal from list 0, but it's empty.  Before it can steal from Thread 2, it's pre-empted.
                // - Thread 0 adds an item.  The count is now 2.
                // - Thread 2 takes an item, which comes from its local queue.  The count is now 1.
                // - Thread 1 continues to try to steal from 2, finds it's empty, and fails its take, even though
                //   at any given time during its take the count was >= 1.  Oops.
                // This is particularly problematic for wrapper types that track count using their own synchronization,
                // e.g. BlockingCollection, and thus expect that a take will always be successful if the number of items
                // is known to be > 0.
                //
                // We work around this by looking at the number of times any list transitions from == 0 to > 0,
                // checking that before and after the steal attempts.  We don't care about > 0 to > 0 transitions,
                // because a steal from a list with > 0 elements would have been successful.
                long initialEmptyToNonEmptyCounts = Interlocked.Read(ref _emptyToNonEmptyListTransitionCount);
 
                // If there's no local queue for this thread, just start from the head queue
                // and try to steal from each queue until we get a result. If there is a local queue from this thread,
                // then start from the next queue after it, and then iterate around back from the head to this queue,
                // not including it.
                WorkStealingQueue? localQueue = GetCurrentThreadWorkStealingQueue(forceCreate: false);
                bool gotItem = localQueue == null ?
                    TryStealFromTo(_workStealingQueues, null, out result, take) :
                    (TryStealFromTo(localQueue._nextQueue, null, out result, take) || TryStealFromTo(_workStealingQueues, localQueue, out result, take));
                if (gotItem)
                {
#pragma warning disable CS8762
                    // https://github.com/dotnet/runtime/issues/36132
                    // Compiler can't automatically deduce that nullability constraints
                    // for 'result' are satisfied at this exit point.
                    return true;
#pragma warning restore CS8762
                }
 
                if (Interlocked.Read(ref _emptyToNonEmptyListTransitionCount) == initialEmptyToNonEmptyCounts)
                {
                    // The version number matched, so we didn't get an item and we're confident enough
                    // in our steal attempt to say so.
                    return false;
                }
 
                // Some list transitioned from empty to non-empty between just before the steal and now.
                // Since we don't know if it caused a race condition like the above description, we
                // have little choice but to try to steal again.
            }
        }
 
        /// <summary>
        /// Attempts to steal from each queue starting from <paramref name="startInclusive"/> to <paramref name="endExclusive"/>.
        /// </summary>
        private static bool TryStealFromTo(WorkStealingQueue? startInclusive, WorkStealingQueue? endExclusive, [MaybeNullWhen(false)] out T result, bool take)
        {
            for (WorkStealingQueue? queue = startInclusive; queue != endExclusive; queue = queue._nextQueue)
            {
                if (queue!.TrySteal(out result, take))
                {
                    return true;
                }
            }
 
            result = default(T);
            return false;
        }
 
        /// <summary>
        /// Copies the <see cref="ConcurrentBag{T}"/> elements to an existing
        /// one-dimensional <see cref="System.Array">Array</see>, starting at the specified array
        /// index.
        /// </summary>
        /// <param name="array">The one-dimensional <see cref="System.Array">Array</see> that is the
        /// destination of the elements copied from the
        /// <see cref="ConcurrentBag{T}"/>. The <see
        /// cref="System.Array">Array</see> must have zero-based indexing.</param>
        /// <param name="index">The zero-based index in <paramref name="array"/> at which copying
        /// begins.</param>
        /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
        /// Visual Basic).</exception>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
        /// zero.</exception>
        /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the
        /// length of the <paramref name="array"/>
        /// -or- the number of elements in the source <see
        /// cref="ConcurrentBag{T}"/> is greater than the available space from
        /// <paramref name="index"/> to the end of the destination <paramref name="array"/>.</exception>
        public void CopyTo(T[] array, int index)
        {
            ArgumentNullException.ThrowIfNull(array);
            ArgumentOutOfRangeException.ThrowIfNegative(index);
 
            // Short path if the bag is empty
            if (_workStealingQueues == null)
            {
                return;
            }
 
            bool lockTaken = false;
            try
            {
                FreezeBag(ref lockTaken);
 
                // Make sure we won't go out of bounds on the array
                int count = DangerousCount;
                if (index > array.Length - count)
                {
                    throw new ArgumentException(SR.Collection_CopyTo_TooManyElems, nameof(index));
                }
 
                // Do the copy
                try
                {
                    int copied = CopyFromEachQueueToArray(array, index);
                    Debug.Assert(copied == count);
                }
                catch (ArrayTypeMismatchException e)
                {
                    // Propagate same exception as in desktop
                    throw new InvalidCastException(e.Message, e);
                }
            }
            finally
            {
                UnfreezeBag(lockTaken);
            }
        }
 
        /// <summary>Copies from each queue to the target array, starting at the specified index.</summary>
        private int CopyFromEachQueueToArray(T[] array, int index)
        {
            Debug.Assert(Monitor.IsEntered(GlobalQueuesLock));
 
            int i = index;
            for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
            {
                i += queue.DangerousCopyTo(array, i);
            }
            return i - index;
        }
 
        /// <summary>
        /// Copies the elements of the <see cref="System.Collections.ICollection"/> to an <see
        /// cref="System.Array"/>, starting at a particular
        /// <see cref="System.Array"/> index.
        /// </summary>
        /// <param name="array">The one-dimensional <see cref="System.Array">Array</see> that is the
        /// destination of the elements copied from the
        /// <see cref="ConcurrentBag{T}"/>. The <see
        /// cref="System.Array">Array</see> must have zero-based indexing.</param>
        /// <param name="index">The zero-based index in <paramref name="array"/> at which copying
        /// begins.</param>
        /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
        /// Visual Basic).</exception>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
        /// zero.</exception>
        /// <exception cref="ArgumentException">
        /// <paramref name="array"/> is multidimensional. -or-
        /// <paramref name="array"/> does not have zero-based indexing. -or-
        /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/>
        /// -or- The number of elements in the source <see cref="System.Collections.ICollection"/> is
        /// greater than the available space from <paramref name="index"/> to the end of the destination
        /// <paramref name="array"/>. -or- The type of the source <see
        /// cref="System.Collections.ICollection"/> cannot be cast automatically to the type of the
        /// destination <paramref name="array"/>.
        /// </exception>
        void ICollection.CopyTo(Array array, int index)
        {
            // If the destination is actually a T[], use the strongly-typed
            // overload that doesn't allocate/copy an extra array.
            T[]? szArray = array as T[];
            if (szArray != null)
            {
                CopyTo(szArray, index);
                return;
            }
 
            // Otherwise, fall back to first storing the contents to an array,
            // and then relying on its CopyTo to copy to the target Array.
            ArgumentNullException.ThrowIfNull(array);
            ToArray().CopyTo(array, index);
        }
 
        /// <summary>
        /// Copies the <see cref="ConcurrentBag{T}"/> elements to a new array.
        /// </summary>
        /// <returns>A new array containing a snapshot of elements copied from the <see
        /// cref="ConcurrentBag{T}"/>.</returns>
        public T[] ToArray()
        {
            if (_workStealingQueues != null)
            {
                bool lockTaken = false;
                try
                {
                    FreezeBag(ref lockTaken);
 
                    int count = DangerousCount;
                    if (count > 0)
                    {
                        var arr = new T[count];
                        int copied = CopyFromEachQueueToArray(arr, 0);
                        Debug.Assert(copied == count);
                        return arr;
                    }
                }
                finally
                {
                    UnfreezeBag(lockTaken);
                }
            }
 
            // Bag was empty
            return Array.Empty<T>();
        }
 
        /// <summary>
        /// Removes all values from the <see cref="ConcurrentBag{T}"/>.
        /// </summary>
        public void Clear()
        {
            // If there are no queues in the bag, there's nothing to clear.
            if (_workStealingQueues == null)
            {
                return;
            }
 
            // Clear the local queue.
            WorkStealingQueue? local = GetCurrentThreadWorkStealingQueue(forceCreate: false);
            if (local != null)
            {
                local.LocalClear();
                if (local._nextQueue == null && local == _workStealingQueues)
                {
                    // If it's the only queue, nothing more to do.
                    return;
                }
            }
 
            // Clear the other queues by stealing all remaining items. We freeze the bag to
            // avoid having to contend with too many new items being added while we're trying
            // to drain the bag. But we can't just freeze the bag and attempt to remove all
            // items from every other queue, as even with freezing the bag it's dangerous to
            // manipulate other queues' tail pointers and add/take counts.
            bool lockTaken = false;
            try
            {
                FreezeBag(ref lockTaken);
                for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
                {
                    T? ignored;
                    while (queue.TrySteal(out ignored, take: true)) ;
                }
            }
            finally
            {
                UnfreezeBag(lockTaken);
            }
        }
 
        /// <summary>
        /// Returns an enumerator that iterates through the <see
        /// cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <returns>An enumerator for the contents of the <see
        /// cref="ConcurrentBag{T}"/>.</returns>
        /// <remarks>
        /// The enumeration represents a moment-in-time snapshot of the contents
        /// of the bag.  It does not reflect any updates to the collection after
        /// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
        /// concurrently with reads from and writes to the bag.
        /// </remarks>
        public IEnumerator<T> GetEnumerator() => new Enumerator(ToArray());
 
        /// <summary>
        /// Returns an enumerator that iterates through the <see
        /// cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <returns>An enumerator for the contents of the <see
        /// cref="ConcurrentBag{T}"/>.</returns>
        /// <remarks>
        /// The items enumerated represent a moment-in-time snapshot of the contents
        /// of the bag.  It does not reflect any update to the collection after
        /// <see cref="GetEnumerator"/> was called.
        /// </remarks>
        IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
 
        /// <summary>
        /// Gets the number of elements contained in the <see cref="ConcurrentBag{T}"/>.
        /// </summary>
        /// <value>The number of elements contained in the <see cref="ConcurrentBag{T}"/>.</value>
        /// <remarks>
        /// The count returned represents a moment-in-time snapshot of the contents
        /// of the bag.  It does not reflect any updates to the collection after
        /// <see cref="GetEnumerator"/> was called.
        /// </remarks>
        public int Count
        {
            get
            {
                // Short path if the bag is empty
                if (_workStealingQueues == null)
                {
                    return 0;
                }
 
                bool lockTaken = false;
                try
                {
                    FreezeBag(ref lockTaken);
                    return DangerousCount;
                }
                finally
                {
                    UnfreezeBag(lockTaken);
                }
            }
        }
 
        /// <summary>Gets the number of items stored in the bag.</summary>
        /// <remarks>Only provides a stable result when the bag is frozen.</remarks>
        private int DangerousCount
        {
            get
            {
                int count = 0;
                for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
                {
                    checked { count += queue.DangerousCount; }
                }
 
                Debug.Assert(count >= 0);
                return count;
            }
        }
 
        /// <summary>
        /// Gets a value that indicates whether the <see cref="ConcurrentBag{T}"/> is empty.
        /// </summary>
        /// <value>true if the <see cref="ConcurrentBag{T}"/> is empty; otherwise, false.</value>
        public bool IsEmpty
        {
            get
            {
                // Fast-path based on the current thread's local queue.
                WorkStealingQueue? local = GetCurrentThreadWorkStealingQueue(forceCreate: false);
                if (local != null)
                {
                    // We don't need the lock to check the local queue, as no other thread
                    // could be adding to it, and a concurrent steal that transitions from
                    // non-empty to empty doesn't matter because if we see this as non-empty,
                    // then that's a valid moment-in-time answer, and if we see this as empty,
                    // we check other things.
                    if (!local.IsEmpty)
                    {
                        return false;
                    }
 
                    // We know the local queue is empty (no one besides this thread could have
                    // added to it since we checked).  If the local queue is the only one
                    // in the bag, then the bag is empty, too.
                    if (local._nextQueue == null && local == _workStealingQueues)
                    {
                        return true;
                    }
                }
 
                // Couldn't take a fast path. Freeze the bag, and enumerate the queues to see if
                // any is non-empty.
                bool lockTaken = false;
                try
                {
                    FreezeBag(ref lockTaken);
                    for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
                    {
                        if (!queue.IsEmpty)
                        {
                            return false;
                        }
                    }
                }
                finally
                {
                    UnfreezeBag(lockTaken);
                }
 
                // All queues were empty, so the bag was empty.
                return true;
            }
        }
 
        /// <summary>
        /// Gets a value indicating whether access to the <see cref="System.Collections.ICollection"/> is
        /// synchronized with the SyncRoot.
        /// </summary>
        /// <value>true if access to the <see cref="System.Collections.ICollection"/> is synchronized
        /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentBag{T}"/>, this property always
        /// returns false.</value>
        bool ICollection.IsSynchronized => false;
 
        /// <summary>
        /// Gets an object that can be used to synchronize access to the <see
        /// cref="System.Collections.ICollection"/>. This property is not supported.
        /// </summary>
        /// <exception cref="System.NotSupportedException">The SyncRoot property is not supported.</exception>
        object ICollection.SyncRoot
        {
            get { throw new NotSupportedException(SR.ConcurrentCollection_SyncRoot_NotSupported); }
        }
 
        /// <summary>Global lock used to synchronize the queues pointer and all bag-wide operations (e.g. ToArray, Count, etc.).</summary>
        private object GlobalQueuesLock
        {
            get
            {
                Debug.Assert(_locals != null);
                return _locals;
            }
        }
 
        /// <summary>"Freezes" the bag, such that no concurrent operations will be mutating the bag when it returns.</summary>
        /// <param name="lockTaken">true if the global lock was taken; otherwise, false.</param>
        private void FreezeBag(ref bool lockTaken)
        {
            // Take the global lock to start freezing the bag.  This helps, for example,
            // to prevent other threads from joining the bag (adding their local queues)
            // while a global operation is in progress.
            Debug.Assert(!Monitor.IsEntered(GlobalQueuesLock));
            Monitor.Enter(GlobalQueuesLock, ref lockTaken);
            WorkStealingQueue? head = _workStealingQueues; // stable at least until GlobalQueuesLock is released in UnfreezeBag
 
            // Then acquire all local queue locks, noting on each that it's been taken.
            for (WorkStealingQueue? queue = head; queue != null; queue = queue._nextQueue)
            {
                Monitor.Enter(queue, ref queue._frozen);
            }
            Interlocked.MemoryBarrier(); // prevent reads of _currentOp from moving before writes to _frozen
 
            // Finally, wait for all unsynchronized operations on each queue to be done.
            for (WorkStealingQueue? queue = head; queue != null; queue = queue._nextQueue)
            {
                if (queue._currentOp != Operation.None)
                {
                    SpinWait spinner = default;
                    do { spinner.SpinOnce(); }
                    while (queue._currentOp != Operation.None);
                }
            }
        }
 
        /// <summary>"Unfreezes" a bag frozen with <see cref="FreezeBag(ref bool)"/>.</summary>
        /// <param name="lockTaken">The result of the <see cref="FreezeBag(ref bool)"/> method.</param>
        private void UnfreezeBag(bool lockTaken)
        {
            Debug.Assert(Monitor.IsEntered(GlobalQueuesLock) == lockTaken);
            if (lockTaken)
            {
                // Release all of the individual queue locks.
                for (WorkStealingQueue? queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
                {
                    if (queue._frozen)
                    {
                        queue._frozen = false;
                        Monitor.Exit(queue);
                    }
                }
 
                // Then release the global lock.
                Monitor.Exit(GlobalQueuesLock);
            }
        }
 
        /// <summary>Provides a work-stealing queue data structure stored per thread.</summary>
        private sealed class WorkStealingQueue
        {
            /// <summary>Initial size of the queue's array.</summary>
            private const int InitialSize = 32;
            /// <summary>Starting index for the head and tail indices.</summary>
            private const int StartIndex =
#if DEBUG
                int.MaxValue; // in debug builds, start at the end so we exercise the index reset logic
#else
                0;
#endif
            /// <summary>Head index from which to steal.  This and'd with the <see cref="_mask"/> is the index into <see cref="_array"/>.</summary>
            private volatile int _headIndex = StartIndex;
            /// <summary>Tail index at which local pushes/pops happen. This and'd with the <see cref="_mask"/> is the index into <see cref="_array"/>.</summary>
            private volatile int _tailIndex = StartIndex;
            /// <summary>The array storing the queue's data.</summary>
            private volatile T[] _array = new T[InitialSize];
            /// <summary>Mask and'd with <see cref="_headIndex"/> and <see cref="_tailIndex"/> to get an index into <see cref="_array"/>.</summary>
            private volatile int _mask = InitialSize - 1;
            /// <summary>Numbers of elements in the queue from the local perspective; needs to be combined with <see cref="_stealCount"/> to get an actual Count.</summary>
            private int _addTakeCount;
            /// <summary>Number of steals; needs to be combined with <see cref="_addTakeCount"/> to get an actual Count.</summary>
            private int _stealCount;
            /// <summary>The current queue operation. Used to quiesce before performing operations from one thread onto another.</summary>
            internal volatile Operation _currentOp;
            /// <summary>true if this queue's lock is held as part of a global freeze.</summary>
            internal bool _frozen;
            /// <summary>Next queue in the <see cref="ConcurrentBag{T}"/>'s set of thread-local queues.</summary>
            internal readonly WorkStealingQueue? _nextQueue;
            /// <summary>Thread ID that owns this queue.</summary>
            internal readonly int _ownerThreadId;
 
            /// <summary>Initialize the WorkStealingQueue.</summary>
            /// <param name="nextQueue">The next queue in the linked list of work-stealing queues.</param>
            internal WorkStealingQueue(WorkStealingQueue? nextQueue)
            {
                _ownerThreadId = Environment.CurrentManagedThreadId;
                _nextQueue = nextQueue;
            }
 
            /// <summary>Gets whether the queue is empty.</summary>
            internal bool IsEmpty
            {
                get
                {
                    // _tailIndex can be decremented even while the bag is frozen, as the decrement in TryLocalPop happens prior
                    // to the check for _frozen.  But that's ok, as if _tailIndex is being decremented such that _headIndex becomes
                    // >= _tailIndex, then the queue is about to be empty.  This does mean, though, that while holding the lock,
                    // it is possible to observe Count == 1 but IsEmpty == true.  As such, we simply need to avoid doing any operation
                    // while the bag is frozen that requires those values to be consistent.
                    return _headIndex - _tailIndex >= 0;
                }
            }
 
            /// <summary>
            /// Add new item to the tail of the queue.
            /// </summary>
            /// <param name="item">The item to add.</param>
            /// <param name="emptyToNonEmptyListTransitionCount"></param>
            internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
            {
                Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
                bool lockTaken = false;
                try
                {
                    // Full fence to ensure subsequent reads don't get reordered before this
                    Interlocked.Exchange(ref _currentOp, Operation.Add);
                    int tail = _tailIndex;
 
                    // Rare corner case (at most once every 2 billion pushes on this thread):
                    // We're going to increment the tail; if we'll overflow, then we need to reset our counts
                    if (tail == int.MaxValue)
                    {
                        _currentOp = Operation.None; // set back to None temporarily to avoid a deadlock
                        lock (this)
                        {
                            Debug.Assert(_tailIndex == tail, "No other thread should be changing _tailIndex");
 
                            // Rather than resetting to zero, we'll just mask off the bits we don't care about.
                            // This way we don't need to rearrange the items already in the queue; they'll be found
                            // correctly exactly where they are.  One subtlety here is that we need to make sure that
                            // if head is currently < tail, it remains that way.  This happens to just fall out from
                            // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
                            // bits are set, so all of the bits we're keeping will also be set.  Thus it's impossible
                            // for the head to end up > than the tail, since you can't set any more bits than all of them.
                            _headIndex &= _mask;
                            _tailIndex = tail &= _mask;
                            Debug.Assert(_headIndex - _tailIndex <= 0);
 
                            Interlocked.Exchange(ref _currentOp, Operation.Add); // ensure subsequent reads aren't reordered before this
                        }
                    }
 
                    // We'd like to take the fast path that doesn't require locking, if possible. It's not possible if:
                    // - another thread is currently requesting that the whole bag synchronize, e.g. a ToArray operation
                    // - if there are fewer than two spaces available.  One space is necessary for obvious reasons:
                    //   to store the element we're trying to push.  The other is necessary due to synchronization with steals.
                    //   A stealing thread first increments _headIndex to reserve the slot at its old value, and then tries to
                    //   read from that slot.  We could potentially have a race condition whereby _headIndex is incremented just
                    //   before this check, in which case we could overwrite the element being stolen as that slot would appear
                    //   to be empty.  Thus, we only allow the fast path if there are two empty slots.
                    // - if there <= 1 elements in the list.  We need to be able to successfully track transitions from
                    //   empty to non-empty in a way that other threads can check, and we can only do that tracking
                    //   correctly if we synchronize with steals when it's possible a steal could take the last item
                    //   in the list just as we're adding.  It's possible at this point that there's currently an active steal
                    //   operation happening but that it hasn't yet incremented the head index, such that we could read a smaller
                    //   than accurate by 1 value for the head.  However, only one steal could possibly be doing so, as steals
                    //   take the lock, and another steal couldn't then increment the header further because it'll see that
                    //   there's currently an add operation in progress and wait until the add completes.
                    int head = _headIndex; // read after _currentOp set to Add
                    if (!_frozen && (head - (tail - 1) < 0) && (tail - (head + _mask) < 0))
                    {
                        _array[tail & _mask] = item;
                        _tailIndex = tail + 1;
                    }
                    else
                    {
                        // We need to contend with foreign operations (e.g. steals, enumeration, etc.), so we lock.
                        _currentOp = Operation.None; // set back to None to avoid a deadlock
                        Monitor.Enter(this, ref lockTaken);
 
                        head = _headIndex;
                        int count = tail - head; // this count is stable, as we're holding the lock
 
                        // If we're full, expand the array.
                        if (count >= _mask)
                        {
                            // Expand the queue by doubling its size.
                            var newArray = new T[_array.Length << 1];
                            int headIdx = head & _mask;
                            if (headIdx == 0)
                            {
                                Array.Copy(_array, newArray, _array.Length);
                            }
                            else
                            {
                                Array.Copy(_array, headIdx, newArray, 0, _array.Length - headIdx);
                                Array.Copy(_array, 0, newArray, _array.Length - headIdx, headIdx);
                            }
 
                            // Reset the field values
                            _array = newArray;
                            _headIndex = 0;
                            _tailIndex = tail = count;
                            _mask = (_mask << 1) | 1;
                        }
 
                        // Add the element
                        _array[tail & _mask] = item;
                        _tailIndex = tail + 1;
 
                        // Now that the item has been added, if we were at 0 (now at 1) item,
                        // increase the empty to non-empty transition count.
                        if (count == 0)
                        {
                            // We just transitioned from empty to non-empty, so increment the transition count.
                            Interlocked.Increment(ref emptyToNonEmptyListTransitionCount);
                        }
 
                        // Update the count to avoid overflow.  We can trust _stealCount here,
                        // as we're inside the lock and it's only manipulated there.
                        _addTakeCount -= _stealCount;
                        _stealCount = 0;
                    }
 
                    // Increment the count from the add/take perspective
                    checked { _addTakeCount++; }
                }
                finally
                {
                    _currentOp = Operation.None;
                    if (lockTaken)
                    {
                        Monitor.Exit(this);
                    }
                }
            }
 
            /// <summary>Clears the contents of the local queue.</summary>
            internal void LocalClear()
            {
                Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
                lock (this) // synchronize with steals
                {
                    // If the queue isn't empty, reset the state to clear out all items.
                    if (_headIndex - _tailIndex < 0)
                    {
                        _headIndex = _tailIndex = StartIndex;
                        _addTakeCount = _stealCount = 0;
                        Array.Clear(_array);
                    }
                }
            }
 
            /// <summary>Remove an item from the tail of the queue.</summary>
            /// <param name="result">The removed item</param>
            internal bool TryLocalPop([MaybeNullWhen(false)] out T result)
            {
                Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
 
                int tail = _tailIndex;
                if (_headIndex - tail >= 0)
                {
                    result = default(T);
                    return false;
                }
 
                bool lockTaken = false;
                try
                {
                    // Decrement the tail using a full fence to ensure subsequent reads don't reorder before this.
                    // If the read of _headIndex moved before this write to _tailIndex, we could erroneously end up
                    // popping an element that's concurrently being stolen, leading to the same element being
                    // dequeued from the bag twice.
                    _currentOp = Operation.Take;
                    Interlocked.Exchange(ref _tailIndex, --tail);
 
                    // If there is no interaction with a steal, we can head down the fast path.
                    // Note that we use _headIndex < tail rather than _headIndex <= tail to account
                    // for stealing peeks, which don't increment _headIndex, and which could observe
                    // the written default(T) in a race condition to peek at the element.
                    if (!_frozen && (_headIndex - tail < 0))
                    {
                        int idx = tail & _mask;
                        result = _array[idx];
                        if (RuntimeHelpers.IsReferenceOrContainsReferences<T>())
                        {
                            _array[idx] = default(T)!;
                        }
                        _addTakeCount--;
                        return true;
                    }
                    else
                    {
                        // Interaction with steals: 0 or 1 elements left.
                        _currentOp = Operation.None; // set back to None to avoid a deadlock
                        Monitor.Enter(this, ref lockTaken);
                        if (_headIndex - tail <= 0)
                        {
                            // Element still available. Take it.
                            int idx = tail & _mask;
                            result = _array[idx];
                            if (RuntimeHelpers.IsReferenceOrContainsReferences<T>())
                            {
                                _array[idx] = default(T)!;
                            }
                            _addTakeCount--;
                            return true;
                        }
                        else
                        {
                            // We encountered a race condition and the element was stolen, restore the tail.
                            _tailIndex = tail + 1;
                            result = default(T);
                            return false;
                        }
                    }
                }
                finally
                {
                    _currentOp = Operation.None;
                    if (lockTaken)
                    {
                        Monitor.Exit(this);
                    }
                }
            }
 
            /// <summary>Peek an item from the tail of the queue.</summary>
            /// <param name="result">the peeked item</param>
            /// <returns>True if succeeded, false otherwise</returns>
            internal bool TryLocalPeek([MaybeNullWhen(false)] out T result)
            {
                Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
 
                int tail = _tailIndex;
                if (_headIndex - tail < 0)
                {
                    // It is possible to enable lock-free peeks, following the same general approach
                    // that's used in TryLocalPop.  However, peeks are more complicated as we can't
                    // do the same kind of index reservation that's done in TryLocalPop; doing so could
                    // end up making a steal think that no item is available, even when one is. To do
                    // it correctly, then, we'd need to add spinning to TrySteal in case of a concurrent
                    // peek happening. With a lock, the common case (no contention with steals) will
                    // effectively only incur two interlocked operations (entering/exiting the lock) instead
                    // of one (setting Peek as the _currentOp).  Combined with Peeks on a bag being rare,
                    // for now we'll use the simpler/safer code.
                    lock (this)
                    {
                        if (_headIndex - tail < 0)
                        {
                            result = _array[(tail - 1) & _mask];
                            return true;
                        }
                    }
                }
 
                result = default(T);
                return false;
            }
 
            /// <summary>Steal an item from the head of the queue.</summary>
            /// <param name="result">the removed item</param>
            /// <param name="take">true to take the item; false to simply peek at it</param>
            internal bool TrySteal([MaybeNullWhen(false)] out T result, bool take)
            {
                lock (this)
                {
                    int head = _headIndex; // _headIndex is only manipulated under the lock
                    if (take)
                    {
                        // If there are <= 2 items in the list, we need to ensure no add operation
                        // is in progress, as add operations need to accurately count transitions
                        // from empty to non-empty, and they can only do that if there are no concurrent
                        // steal operations happening at the time.
                        if ((head - (_tailIndex - 2) >= 0) && _currentOp == Operation.Add)
                        {
                            SpinWait spinner = default;
                            do
                            {
                                spinner.SpinOnce();
                            }
                            while (_currentOp == Operation.Add);
                        }
 
                        // Increment head to tentatively take an element: a full fence is used to ensure the read
                        // of _tailIndex doesn't move earlier, as otherwise we could potentially end up stealing
                        // the same element that's being popped locally.
                        Interlocked.Exchange(ref _headIndex, unchecked(head + 1));
 
                        // If there's an element to steal, do it.
                        if (head < _tailIndex)
                        {
                            int idx = head & _mask;
                            result = _array[idx];
                            if (RuntimeHelpers.IsReferenceOrContainsReferences<T>())
                            {
                                _array[idx] = default(T)!;
                            }
                            _stealCount++;
                            return true;
                        }
                        else
                        {
                            // We contended with the local thread and lost the race, so restore the head.
                            _headIndex = head;
                        }
                    }
                    else if (head < _tailIndex)
                    {
                        // Peek, if there's an element available
                        result = _array[head & _mask];
                        return true;
                    }
                }
 
                // The queue was empty.
                result = default(T);
                return false;
            }
 
            /// <summary>Copies the contents of this queue to the target array starting at the specified index.</summary>
            internal int DangerousCopyTo(T[] array, int arrayIndex)
            {
                Debug.Assert(Monitor.IsEntered(this));
                Debug.Assert(_frozen);
                Debug.Assert(array != null);
                Debug.Assert(arrayIndex >= 0 && arrayIndex <= array.Length);
 
                int headIndex = _headIndex;
                int count = DangerousCount;
                Debug.Assert(
                    count == (_tailIndex - _headIndex) ||
                    count == (_tailIndex + 1 - _headIndex),
                    "Count should be the same as tail - head, but allowing for the possibility that " +
                    "a peek decremented _tailIndex before seeing that a freeze was happening.");
                Debug.Assert(arrayIndex <= array.Length - count);
 
                // Copy from this queue's array to the destination array, but in reverse
                // order to match the ordering of desktop.
                for (int i = arrayIndex + count - 1; i >= arrayIndex; i--)
                {
                    array[i] = _array[headIndex++ & _mask];
                }
 
                return count;
            }
 
            /// <summary>Gets the total number of items in the queue.</summary>
            /// <remarks>
            /// This is not thread safe, only providing an accurate result either from the owning
            /// thread while its lock is held or from any thread while the bag is frozen.
            /// </remarks>
            internal int DangerousCount
            {
                get
                {
                    Debug.Assert(Monitor.IsEntered(this));
                    int stealCount = _stealCount;
                    int addTakeCount = _addTakeCount;
                    int count = addTakeCount - stealCount;
                    Debug.Assert(count >= 0, $"Expected _addTakeCount ({addTakeCount}) >= _stealCount ({stealCount}).");
                    return count;
                }
            }
        }
 
        /// <summary>Lock-free operations performed on a queue.</summary>
        internal enum Operation
        {
            None,
            Add,
            Take
        };
 
        /// <summary>Provides an enumerator for the bag.</summary>
        /// <remarks>
        /// The original implementation of ConcurrentBag used a <see cref="List{T}"/> as part of
        /// the GetEnumerator implementation.  That list was then changed to be an array, but array's
        /// GetEnumerator has different behavior than does list's, in particular for the case where
        /// Current is used after MoveNext returns false.  To avoid any concerns around compatibility,
        /// we use a custom enumerator rather than just returning array's. This enumerator provides
        /// the essential elements of both list's and array's enumerators.
        /// </remarks>
        private sealed class Enumerator : IEnumerator<T>
        {
            private readonly T[] _array;
            private T? _current;
            private int _index;
 
            public Enumerator(T[] array)
            {
                Debug.Assert(array != null);
                _array = array;
            }
 
            public bool MoveNext()
            {
                if (_index < _array.Length)
                {
                    _current = _array[_index++];
                    return true;
                }
 
                _index = _array.Length + 1;
                return false;
            }
 
            public T Current => _current!;
 
            object? IEnumerator.Current
            {
                get
                {
                    if (_index == 0 || _index == _array.Length + 1)
                    {
                        throw new InvalidOperationException(SR.ConcurrentBag_Enumerator_EnumerationNotStartedOrAlreadyFinished);
                    }
                    return Current;
                }
            }
 
            public void Reset()
            {
                _index = 0;
                _current = default;
            }
 
            public void Dispose() { }
        }
    }
}