File: System\Collections\Concurrent\BlockingCollection.cs
Web Access
Project: src\src\libraries\System.Collections.Concurrent\src\System.Collections.Concurrent.csproj (System.Collections.Concurrent)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BlockingCollection.cs
//
// A class that implements the bounding and blocking functionality while abstracting away
// the underlying storage mechanism. This file also contains BlockingCollection's
// associated debugger view type.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Runtime.Versioning;
using System.Threading;
 
namespace System.Collections.Concurrent
{
    /// <summary>
    /// Provides blocking and bounding capabilities for thread-safe collections that
    /// implement <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
    /// </summary>
    /// <remarks>
    /// <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/> represents a collection
    /// that allows for thread-safe adding and removing of data.
    /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> is used as a wrapper
    /// for an <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/> instance, allowing
    /// removal attempts from the collection to block until data is available to be removed.  Similarly,
    /// a <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> can be created to enforce
    /// an upper-bound on the number of data elements allowed in the
    /// <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/>; addition attempts to the
    /// collection may then block until space is available to store the added items.  In this manner,
    /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> is similar to a traditional
    /// blocking queue data structure, except that the underlying data storage mechanism is abstracted
    /// away as an <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
    /// </remarks>
    /// <typeparam name="T">Specifies the type of elements in the collection.</typeparam>
    [UnsupportedOSPlatform("browser")]
    [DebuggerTypeProxy(typeof(BlockingCollectionDebugView<>))]
    [DebuggerDisplay("Count = {Count}, Type = {_collection}")]
    public class BlockingCollection<T> : IEnumerable<T>, ICollection, IDisposable, IReadOnlyCollection<T>
    {
        private IProducerConsumerCollection<T> _collection;
        private int _boundedCapacity;
        private const int NON_BOUNDED = -1;
        private SemaphoreSlim? _freeNodes;
        private SemaphoreSlim _occupiedNodes;
        private bool _isDisposed;
        private CancellationTokenSource _consumersCancellationTokenSource;
        private CancellationTokenSource _producersCancellationTokenSource;
 
        private volatile int _currentAdders;
        private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000);
 
        #region Properties
        /// <summary>Gets the bounded capacity of this <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</summary>
        /// <value>The bounded capacity of this collection, or -1 if no bound was supplied.</value>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public int BoundedCapacity
        {
            get
            {
                CheckDisposed();
                return _boundedCapacity;
            }
        }
 
        /// <summary>Gets whether this <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding.</summary>
        /// <value>Whether this collection has been marked as complete for adding.</value>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public bool IsAddingCompleted
        {
            get
            {
                CheckDisposed();
                return (_currentAdders == COMPLETE_ADDING_ON_MASK);
            }
        }
 
        /// <summary>Gets whether this <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding and is empty.</summary>
        /// <value>Whether this collection has been marked as complete for adding and is empty.</value>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public bool IsCompleted
        {
            get
            {
                CheckDisposed();
                return (IsAddingCompleted && (_occupiedNodes.CurrentCount == 0));
            }
        }
 
        /// <summary>Gets the number of items contained in the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
        /// <value>The number of items contained in the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.</value>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public int Count
        {
            get
            {
                CheckDisposed();
                return _occupiedNodes.CurrentCount;
            }
        }
 
        /// <summary>Gets a value indicating whether access to the <see cref="System.Collections.ICollection"/> is synchronized.</summary>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        bool ICollection.IsSynchronized
        {
            get
            {
                CheckDisposed();
                return false;
            }
        }
 
        /// <summary>
        /// Gets an object that can be used to synchronize access to the <see
        /// cref="System.Collections.ICollection"/>. This property is not supported.
        /// </summary>
        /// <exception cref="System.NotSupportedException">The SyncRoot property is not supported.</exception>
        object ICollection.SyncRoot
        {
            get
            {
                throw new NotSupportedException(SR.ConcurrentCollection_SyncRoot_NotSupported);
            }
        }
        #endregion
 
 
        /// <summary>Initializes a new instance of the
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>
        /// class without an upper-bound.
        /// </summary>
        /// <remarks>
        /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue&lt;T&gt;</see>.
        /// </remarks>
        public BlockingCollection()
            : this(new ConcurrentQueue<T>())
        {
        }
 
        /// <summary>Initializes a new instance of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/>
        /// class with the specified upper-bound.
        /// </summary>
        /// <param name="boundedCapacity">The bounded size of the collection.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is
        /// not a positive value.</exception>
        /// <remarks>
        /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue&lt;T&gt;</see>.
        /// </remarks>
        public BlockingCollection(int boundedCapacity)
            : this(new ConcurrentQueue<T>(), boundedCapacity)
        {
        }
 
        /// <summary>Initializes a new instance of the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>
        /// class with the specified upper-bound and using the provided
        /// <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
        /// <param name="collection">The collection to use as the underlying data store.</param>
        /// <param name="boundedCapacity">The bounded size of the collection.</param>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collection"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is not a positive value.</exception>
        /// <exception cref="System.ArgumentException">The supplied <paramref name="collection"/> contains more values
        /// than is permitted by <paramref name="boundedCapacity"/>.</exception>
        public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
        {
            ArgumentNullException.ThrowIfNull(collection);
            ArgumentOutOfRangeException.ThrowIfNegativeOrZero(boundedCapacity);
 
            int count = collection.Count;
            if (count > boundedCapacity)
            {
                throw new ArgumentException(SR.BlockingCollection_ctor_CountMoreThanCapacity);
            }
            Initialize(collection, boundedCapacity, count);
        }
 
        /// <summary>Initializes a new instance of the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>
        /// class without an upper-bound and using the provided
        /// <see cref="System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
        /// <param name="collection">The collection to use as the underlying data store.</param>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collection"/> argument is
        /// null.</exception>
        public BlockingCollection(IProducerConsumerCollection<T> collection)
        {
            ArgumentNullException.ThrowIfNull(collection);
 
            Initialize(collection, NON_BOUNDED, collection.Count);
        }
 
        /// <summary>Initializes the BlockingCollection instance.</summary>
        /// <param name="collection">The collection to use as the underlying data store.</param>
        /// <param name="boundedCapacity">The bounded size of the collection.</param>
        /// <param name="collectionCount">The number of items currently in the underlying collection.</param>
        [MemberNotNull(nameof(_collection))]
        [MemberNotNull(nameof(_consumersCancellationTokenSource))]
        [MemberNotNull(nameof(_producersCancellationTokenSource))]
        [MemberNotNull(nameof(_occupiedNodes))]
        private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
        {
            Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
 
            _collection = collection;
            _boundedCapacity = boundedCapacity;
            _isDisposed = false;
            _consumersCancellationTokenSource = new CancellationTokenSource();
            _producersCancellationTokenSource = new CancellationTokenSource();
 
            if (boundedCapacity == NON_BOUNDED)
            {
                _freeNodes = null;
            }
            else
            {
                Debug.Assert(boundedCapacity > 0);
                _freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
            }
 
 
            _occupiedNodes = new SemaphoreSlim(collectionCount);
        }
 
 
        /// <summary>
        /// Adds the item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        /// <remarks>
        /// If a bounded capacity was specified when this instance of
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
        /// a call to Add may block until space is available to store the provided item.
        /// </remarks>
        public void Add(T item)
        {
#if DEBUG
            bool tryAddReturnValue =
#endif
            TryAddWithNoTimeValidation(item, Timeout.Infinite, CancellationToken.None);
#if DEBUG
            Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
        }
 
        /// <summary>
        /// Adds the item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        /// <remarks>
        /// If a bounded capacity was specified when this instance of
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
        /// a call to <see cref="Add(T,System.Threading.CancellationToken)"/> may block until space is available to store the provided item.
        /// </remarks>
        public void Add(T item, CancellationToken cancellationToken)
        {
#if DEBUG
            bool tryAddReturnValue =
#endif
            TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
#if DEBUG
            Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
        }
 
        /// <summary>
        /// Attempts to add the specified item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item to be added to the collection.</param>
        /// <returns>true if the <paramref name="item"/> could be added; otherwise, false.</returns>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        public bool TryAdd(T item)
        {
            return TryAddWithNoTimeValidation(item, 0, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item to be added to the collection.</param>
        /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
        /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
        /// </param>
        /// <returns>true if the <paramref name="item"/> could be added to the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// <see cref="int.MaxValue"/>.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        public bool TryAdd(T item, TimeSpan timeout)
        {
            ValidateTimeout(timeout);
            return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item to be added to the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <returns>true if the <paramref name="item"/> could be added to the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        public bool TryAdd(T item, int millisecondsTimeout)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddWithNoTimeValidation(item, millisecondsTimeout, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="item">The item to be added to the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>true if the <paramref name="item"/> could be added to the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.InvalidOperationException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
        }
 
        /// <summary>Adds an item into the underlying data store using its IProducerConsumerCollection&lt;T&gt;.Add
        /// method. If a bounded capacity was specified and the collection was full,
        /// this method will wait for, at most, the timeout period trying to add the item.
        /// If the timeout period was exhausted before successfully adding the item this method will
        /// return false.</summary>
        /// <param name="item">The item to be added to the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to accept the item,
        /// or Timeout.Infinite to wait indefinitely.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>False if the collection remained full till the timeout period was exhausted.True otherwise.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.InvalidOperationException">the collection has already been marked
        /// as complete with regards to additions.</exception>
        /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
        private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckDisposed();
 
            cancellationToken.ThrowIfCancellationRequested();
 
            if (IsAddingCompleted)
            {
                throw new InvalidOperationException(SR.BlockingCollection_Completed);
            }
 
            bool waitForSemaphoreWasSuccessful = true;
 
            if (_freeNodes != null)
            {
                //If the _freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
                //was called concurrently with Adding which is not supported by BlockingCollection.
                CancellationTokenSource? linkedTokenSource = null;
                try
                {
                    waitForSemaphoreWasSuccessful = _freeNodes.Wait(0, default);
                    if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
                    {
                        linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
                            cancellationToken, _producersCancellationTokenSource.Token);
                        waitForSemaphoreWasSuccessful = _freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                    }
                }
                catch (OperationCanceledException)
                {
                    //if cancellation was via external token, throw an OCE
                    cancellationToken.ThrowIfCancellationRequested();
 
                    //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
                    //Debug.Assert(_ProducersCancellationTokenSource.Token.IsCancellationRequested);
 
                    throw new InvalidOperationException
                        (SR.BlockingCollection_Add_ConcurrentCompleteAdd);
                }
                finally
                {
                    linkedTokenSource?.Dispose();
                }
            }
            if (waitForSemaphoreWasSuccessful)
            {
                // Update the adders count if the complete adding was not requested, otherwise
                // spins until all adders finish then throw IOE
                // The idea behind to spin until all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
                // not been finished yet
                SpinWait spinner = default;
                while (true)
                {
                    int observedAdders = _currentAdders;
                    if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
                    {
                        spinner.Reset();
                        // CompleteAdding is requested, spin then throw
                        while (_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
                        throw new InvalidOperationException(SR.BlockingCollection_Completed);
                    }
 
                    if (Interlocked.CompareExchange(ref _currentAdders, observedAdders + 1, observedAdders) == observedAdders)
                    {
                        Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread exceeded the maximum limit.");
                        break;
                    }
                    spinner.SpinOnce(sleep1Threshold: -1);
                }
 
                //TryAdd is guaranteed to find a place to add the element. Its return value depends
                //on the semantics of the underlying store. Some underlying stores will not add an already
                //existing item and thus TryAdd returns false indicating that the size of the underlying
                //store did not increase.
                bool addingSucceeded = false;
 
                try
                {
                    //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
                    //This fixes bug #702328, case 2 of 2.
                    cancellationToken.ThrowIfCancellationRequested();
                    addingSucceeded = _collection.TryAdd(item);
 
                    if (!addingSucceeded)
                        throw new InvalidOperationException(SR.BlockingCollection_Add_Failed);
                }
                finally
                {
                    if (addingSucceeded)
                        //After adding an element to the underlying storage, signal to the consumers
                        //waiting on _occupiedNodes that there is a new item added ready to be consumed.
                        _occupiedNodes.Release();
                    else
                        //TryAdd did not result in increasing the size of the underlying store and hence we need
                        //to increment back the count of the _freeNodes semaphore.
                        _freeNodes?.Release();
 
                    // decrement the adders count
                    Debug.Assert((_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
                    Interlocked.Decrement(ref _currentAdders);
                }
            }
            return waitForSemaphoreWasSuccessful;
        }
 
        /// <summary>Takes an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
        /// <returns>The item removed from the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance, or the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
        /// as complete with regards to additions.</exception>
        /// <remarks>A call to <see cref="Take()"/> may block until an item is available to be removed.</remarks>
        public T Take()
        {
            T? item;
 
            if (!TryTake(out item, Timeout.Infinite, CancellationToken.None))
            {
                throw new InvalidOperationException(SR.BlockingCollection_CantTakeWhenDone);
            }
 
            return item;
        }
 
        /// <summary>Takes an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
        /// <returns>The item removed from the collection.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is
        /// canceled.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance, or the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
        /// as complete with regards to additions.</exception>
        /// <remarks>A call to <see cref="Take(CancellationToken)"/> may block until an item is available to be removed.</remarks>
        public T Take(CancellationToken cancellationToken)
        {
            T? item;
 
            if (!TryTake(out item, Timeout.Infinite, cancellationToken))
            {
                throw new InvalidOperationException(SR.BlockingCollection_CantTakeWhenDone);
            }
 
            return item;
        }
 
        /// <summary>
        /// Attempts to remove an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item removed from the collection.</param>
        /// <returns>true if an item could be removed; otherwise, false.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        public bool TryTake([MaybeNullWhen(false)] out T item)
        {
            return TryTake(out item, 0, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to remove an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item removed from the collection.</param>
        /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
        /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
        /// </param>
        /// <returns>true if an item could be removed from the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// <see cref="int.MaxValue"/>.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        public bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout)
        {
            ValidateTimeout(timeout);
            return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
        }
 
        /// <summary>
        /// Attempts to remove an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// </summary>
        /// <param name="item">The item removed from the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <returns>true if an item could be removed from the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        public bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
        }
 
        /// <summary>
        /// Attempts to remove an item from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/>.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="item">The item removed from the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>true if an item could be removed from the collection within
        /// the alloted time; otherwise, false.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">The underlying collection was modified
        /// outside of this <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        public bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
        }
 
        /// <summary>Takes an item from the underlying data store using its IProducerConsumerCollection&lt;T&gt;.Take
        /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false)
        /// trying to remove an item. If the timeout period was exhausted before successfully removing an item
        /// this method will return false.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="item">The item removed from the collection.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to have an item available
        /// for removal, or Timeout.Infinite to wait indefinitely.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <param name="combinedTokenSource">A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token
        /// multiple times.</param>
        /// <returns>False if the collection remained empty till the timeout period was exhausted. True otherwise.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
        private bool TryTakeWithNoTimeValidation([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource? combinedTokenSource)
        {
            CheckDisposed();
            item = default(T)!;
 
            cancellationToken.ThrowIfCancellationRequested();
 
            //If the collection is completed then there is no need to wait.
            if (IsCompleted)
            {
                return false;
            }
            bool waitForSemaphoreWasSuccessful = false;
 
            // set the combined token source to the combinedToken parameter if it is not null (came from GetConsumingEnumerable)
            CancellationTokenSource? linkedTokenSource = combinedTokenSource;
            try
            {
                waitForSemaphoreWasSuccessful = _occupiedNodes.Wait(0);
                if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
                {
                    // create the linked token if it is not created yet
                    linkedTokenSource ??= CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _consumersCancellationTokenSource.Token);
                    waitForSemaphoreWasSuccessful = _occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                }
            }
            //The collection became completed while waiting on the semaphore.
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                return false;
            }
            finally
            {
                // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it
                if (linkedTokenSource != null && combinedTokenSource == null)
                {
                    linkedTokenSource.Dispose();
                }
            }
 
            if (waitForSemaphoreWasSuccessful)
            {
                bool removeSucceeded = false;
                try
                {
                    //The token may have been canceled before an item arrived, so we need a check after the wait has completed.
                    //This fixes bug #702328, case 1 of 2.
                    cancellationToken.ThrowIfCancellationRequested();
 
                    //If an item was successfully removed from the underlying collection.
                    removeSucceeded = _collection.TryTake(out item);
                    if (!removeSucceeded)
                    {
                        // Check if the collection is empty which means that the collection was modified outside BlockingCollection
                        throw new InvalidOperationException
                            (SR.BlockingCollection_Take_CollectionModified);
                    }
                }
                finally
                {
                    // removeFaulted implies !removeSucceeded, but the reverse is not true.
                    if (removeSucceeded)
                    {
                        if (_freeNodes != null)
                        {
                            Debug.Assert(_boundedCapacity != NON_BOUNDED);
                            _freeNodes.Release();
                        }
                    }
                    else
                    {
                        _occupiedNodes.Release();
                    }
                    //Last remover will detect that it has actually removed the last item from the
                    //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores
                    //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
                    //but this is not a problem.
                    if (IsCompleted)
                    {
                        CancelWaitingConsumers();
                    }
                }
            }
 
            return waitForSemaphoreWasSuccessful;
        }
 
 
 
        /// <summary>
        /// Adds the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>
        /// If a bounded capacity was specified when all of the
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
        /// a call to AddToAny may block until space is available in one of the collections
        /// to store the provided item.
        /// </remarks>
        public static int AddToAny(BlockingCollection<T>[] collections, T item)
        {
#if DEBUG
            int tryAddAnyReturnValue =
#else
            return
#endif
                TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None);
#if DEBUG
            Debug.Assert(tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length,
                         "TryAddToAny() was expected to return an index within the bounds of the collections array.");
            return tryAddAnyReturnValue;
#endif
        }
 
        /// <summary>
        /// Adds the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>
        /// If a bounded capacity was specified when all of the
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
        /// a call to AddToAny may block until space is available in one of the collections
        /// to store the provided item.
        /// </remarks>
        public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
        {
#if DEBUG
            int tryAddAnyReturnValue =
#else
            return
#endif
                TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
#if DEBUG
            Debug.Assert(tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length,
                         "TryAddToAny() was expected to return an index within the bounds of the collections array.");
            return tryAddAnyReturnValue;
#endif
        }
 
        /// <summary>
        /// Attempts to add the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/>
        /// array to which the item was added, or -1 if the item could not be added.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
        {
            return TryAddToAny(collections, item, 0, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
        /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
        /// </param>
        /// <returns>The index of the collection in the <paramref name="collections"/>
        /// array to which the item was added, or -1 if the item could not be added.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// <see cref="int.MaxValue"/>.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)
        {
            ValidateTimeout(timeout);
            return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>        /// <returns>The index of the collection in the <paramref name="collections"/>
        /// array to which the item was added, or -1 if the item could not be added.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to add the specified item to any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item to be added to one of the collections.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/>
        /// array to which the item was added, or -1 if the item could not be added.</returns>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
        }
 
        /// <summary>Adds an item to anyone of the specified collections.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The collections into which the item can be added.</param>
        /// <param name="item">The item to be added .</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
        /// operation, or -1 to wait indefinitely.</param>
        /// <param name="externalCancellationToken">A cancellation token to observe.</param>
        /// <returns>The index into collections for the collection which accepted the
        /// adding of the item; -1 if the item could not be added.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
        /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
        /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
        /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
        private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
        {
            ValidateCollectionsArray(collections, true);
            const int OPERATION_FAILED = -1;
 
            // Copy the wait time to another local variable to update it
            int timeout = millisecondsTimeout;
 
            uint startTime = 0;
            if (millisecondsTimeout != Timeout.Infinite)
            {
                startTime = unchecked((uint)Environment.TickCount);
            }
 
            // Fast path for adding if there is at least one unbounded collection
            int index = TryAddToAnyFast(collections, item);
            if (index > -1)
                return index;
 
 
            // Get wait handles and the tokens for all collections,
            // and construct a single combined token from all the tokens,
            // add the combined token handle to the handles list
            // call WaitAny for all handles
            // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
            // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
            // or one if the collection is AddingCompleted then throw AE
            CancellationToken[] collatedCancellationTokens;
            List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens);
 
            //Loop until one of these conditions is met:
            // 1- The operation is succeeded
            // 2- The timeout expired for try* versions
            // 3- The external token is cancelled, throw
            // 4- There is at least one collection marked as adding completed then throw
            while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
            {
                index = -1;
 
                using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
                {
                    handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
 
                    //Wait for any collection to become available.
                    index = WaitHandle.WaitAny(handles.ToArray(), timeout);
 
                    handles.RemoveAt(handles.Count - 1); //remove the linked token
 
                    if (linkedTokenSource.IsCancellationRequested)
                    {
                        externalCancellationToken.ThrowIfCancellationRequested(); //case#3
                        throw new ArgumentException(SR.BlockingCollection_CantAddAnyWhenCompleted, nameof(collections)); //case#4
                    }
                }
 
                Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
 
                if (index == WaitHandle.WaitTimeout) //case#2
                    return OPERATION_FAILED;
 
                //If the timeout period was not exhausted and the appropriate operation succeeded.
                if (collections[index].TryAdd(item)) //case#1
                    return index;
 
                // Update the timeout
                if (millisecondsTimeout != Timeout.Infinite)
                    timeout = UpdateTimeOut(startTime, millisecondsTimeout);
            }
 
            // case #2
            return OPERATION_FAILED;
        }
 
        /// <summary>
        /// Fast path for TryAddToAny to find a non bounded collection and add the items in it
        /// </summary>
        /// <param name="collections">The collections list</param>
        /// <param name="item">The item to be added</param>
        /// <returns>The index which the item has been added, -1 if failed</returns>
        private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item)
        {
            for (int i = 0; i < collections.Length; i++)
            {
                if (collections[i]._freeNodes == null)
                {
#if DEBUG
                    bool result =
#endif
                    collections[i].TryAdd(item);
#if DEBUG
                    Debug.Assert(result);
#endif
                    return i;
                }
            }
            return -1;
        }
        /// <summary>
        /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Completed collections
        /// </summary>
        /// <param name="collections">The blocking collections</param>
        /// <param name="externalCancellationToken">The original CancellationToken</param>
        /// <param name="isAddOperation">True if Add or TryAdd, false if Take or TryTake</param>
        /// <param name="cancellationTokens">Complete list of cancellationTokens to observe</param>
        /// <returns>The collections wait handles</returns>
        private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
        {
            Debug.Assert(collections != null);
            List<WaitHandle> handlesList = new List<WaitHandle>(collections.Length + 1); // + 1 for the external token handle to be added
            List<CancellationToken> tokensList = new List<CancellationToken>(collections.Length + 1); // + 1 for the external token
            tokensList.Add(externalCancellationToken);
 
            // Read the appropriate WaitHandle based on the operation mode.
            if (isAddOperation)
            {
                for (int i = 0; i < collections.Length; i++)
                {
                    BlockingCollection<T> c = collections[i];
                    if (c._freeNodes != null)
                    {
                        handlesList.Add(c._freeNodes.AvailableWaitHandle);
                        tokensList.Add(c._producersCancellationTokenSource.Token);
                    }
                }
            }
            else
            {
                for (int i = 0; i < collections.Length; i++)
                {
                    if (collections[i].IsCompleted) //exclude Completed collections if it is take operation
                        continue;
 
                    handlesList.Add(collections[i]._occupiedNodes.AvailableWaitHandle);
                    tokensList.Add(collections[i]._consumersCancellationTokenSource.Token);
                }
            }
 
            cancellationTokens = tokensList.ToArray();
            return handlesList;
        }
 
        /// <summary>
        /// Helper function to measure and update the wait time
        /// </summary>
        /// <param name="startTime"> The first time (in milliseconds) observed when the wait started</param>
        /// <param name="originalWaitMillisecondsTimeout">The original wait timeoutout in milliseconds</param>
        /// <returns>The new wait time in milliseconds, -1 if the time expired</returns>
        private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
        {
            if (originalWaitMillisecondsTimeout == 0)
            {
                return 0;
            }
            // The function must be called in case the time out is not infinite
            Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite);
 
            uint elapsedMilliseconds = unchecked((uint)Environment.TickCount - startTime);
 
            // Check the elapsed milliseconds is greater than max int because this property is uint
            if (elapsedMilliseconds > int.MaxValue)
            {
                return 0;
            }
 
            // Subtract the elapsed time from the current wait time
            int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds;
            if (currentWaitTimeout <= 0)
            {
                return 0;
            }
 
            return currentWaitTimeout;
        }
        /// <summary>
        /// Takes an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
        public static int TakeFromAny(BlockingCollection<T>[] collections, out T? item)
        {
            return TakeFromAny(collections, out item, CancellationToken.None);
        }
 
        /// <summary>
        /// Takes an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
        public static int TakeFromAny(BlockingCollection<T>[] collections, out T? item, CancellationToken cancellationToken)
        {
            int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken);
            Debug.Assert(returnValue >= 0 && returnValue < collections.Length,
                         "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
            return returnValue;
        }
 
        /// <summary>
        /// Attempts to remove an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
        public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T? item)
        {
            return TryTakeFromAny(collections, out item, 0);
        }
 
        /// <summary>
        /// Attempts to remove an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
        /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
        /// </param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// <see cref="int.MaxValue"/>.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
        public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T? item, TimeSpan timeout)
        {
            ValidateTimeout(timeout);
            return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to remove an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
        public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T? item, int millisecondsTimeout)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None);
        }
 
        /// <summary>
        /// Attempts to remove an item from any one of the specified
        /// <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The array of collections.</param>
        /// <param name="item">The item removed from one of the collections.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
        /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>The index of the collection in the <paramref name="collections"/> array from which
        /// the item was removed, or -1 if an item could not be removed.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">The <paramref name="collections"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="collections"/> argument is
        /// a 0-length array or contains a null element.</exception>
        /// <exception cref="System.ObjectDisposedException">At least one of the <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
        /// negative number other than -1, which represents an infinite time-out.</exception>
        /// <exception cref="System.InvalidOperationException">At least one of the underlying collections was modified
        /// outside of its <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
        /// 62 for STA and 63 for MTA.</exception>
        /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
        public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T? item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken);
        }
 
        /// <summary>Takes an item from anyone of the specified collections.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The collections from which the item can be removed.</param>
        /// <param name="item">The item removed and returned to the caller.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
        /// operation, or -1 to wait indefinitely.</param>
        /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
        /// <param name="externalCancellationToken">A cancellation token to observe.</param>
        /// <returns>The index into collections for the collection which accepted the
        /// removal of the item; -1 if the item could not be removed.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
        /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
        /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
        /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
        private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T? item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
        {
            ValidateCollectionsArray(collections, false);
 
            //try the fast path first
            for (int i = 0; i < collections.Length; i++)
            {
                // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
                if (!collections[i].IsCompleted && collections[i]._occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
                    return i;
            }
 
            //Fast path failed, try the slow path
            return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);
        }
 
 
        /// <summary>Takes an item from anyone of the specified collections.
        /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
        /// canceled.
        /// </summary>
        /// <param name="collections">The collections copy from which the item can be removed.</param>
        /// <param name="item">The item removed and returned to the caller.</param>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
        /// operation, or -1 to wait indefinitely.</param>
        /// <param name="isTakeOperation">True if Take, false if TryTake.</param>
        /// <param name="externalCancellationToken">A cancellation token to observe.</param>
        /// <returns>The index into collections for the collection which accepted the
        /// removal of the item; -1 if the item could not be removed.</returns>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
        /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
        /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
        /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
        private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T? item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
        {
            const int OPERATION_FAILED = -1;
 
            // Copy the wait time to another local variable to update it
            int timeout = millisecondsTimeout;
 
            uint startTime = 0;
            if (millisecondsTimeout != Timeout.Infinite)
            {
                startTime = unchecked((uint)Environment.TickCount);
            }
 
 
            //Loop until one of these conditions is met:
            // 1- The operation is succeeded
            // 2- The timeout expired for try* versions
            // 3- The external token is cancelled, throw
            // 4- The operation is TryTake and all collections are marked as completed, return false
            // 5- The operation is Take and all collection are marked as completed, throw
            while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
            {
                // Get wait handles and the tokens for all collections,
                // and construct a single combined token from all the tokens,
                // add the combined token handle to the handles list
                // call WaitAny for all handles
                // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
                // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
                // or one if the collection is Completed then exclude it and retry
                CancellationToken[] collatedCancellationTokens;
                List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens);
 
                if (handles.Count == 0 && isTakeOperation) //case#5
                    throw new ArgumentException(SR.BlockingCollection_CantTakeAnyWhenAllDone, nameof(collections));
 
                if (handles.Count == 0) //case#4
                    break;
 
                //Wait for any collection to become available.
                using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
                {
                    handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
                    int index = WaitHandle.WaitAny(handles.ToArray(), timeout);
 
                    if (linkedTokenSource.IsCancellationRequested)
                    {
                        externalCancellationToken.ThrowIfCancellationRequested(); //case#3
                    }
 
                    if (!linkedTokenSource.IsCancellationRequested) // if neither internal nor external cancellation requested
                    {
                        Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
                        if (index == WaitHandle.WaitTimeout) //case#2
                            break;
 
                        // adjust the index in case one or more handles removed because they are completed
                        if (collections.Length != handles.Count - 1) // -1 because of the combined token handle
                        {
                            for (int i = 0; i < collections.Length; i++)
                            {
                                if (collections[i]._occupiedNodes.AvailableWaitHandle == handles[index])
                                {
                                    index = i;
                                    break;
                                }
                            }
                        }
 
                        if (collections[index].TryTake(out item)) //case#1
                            return index;
                    }
                }
 
                // Update the timeout
                if (millisecondsTimeout != Timeout.Infinite)
                    timeout = UpdateTimeOut(startTime, millisecondsTimeout);
            }
 
            item = default(T)!; //case#2
            return OPERATION_FAILED;
        }
 
        /// <summary>
        /// Marks the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instances
        /// as not accepting any more additions.
        /// </summary>
        /// <remarks>
        /// After a collection has been marked as complete for adding, adding to the collection is not permitted
        /// and attempts to remove from the collection will not wait when the collection is empty.
        /// </remarks>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public void CompleteAdding()
        {
            CheckDisposed();
 
            if (IsAddingCompleted)
                return;
 
            SpinWait spinner = default;
            while (true)
            {
                int observedAdders = _currentAdders;
                if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
                {
                    spinner.Reset();
                    // If there is another CompleteAdding in progress waiting the current adders, then spin until it finishes
                    while (_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
                    return;
                }
 
                if (Interlocked.CompareExchange(ref _currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
                {
                    spinner.Reset();
                    while (_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
 
                    if (Count == 0)
                    {
                        CancelWaitingConsumers();
                    }
 
                    // We should always wake waiting producers, and have them throw exceptions as
                    // Add&CompleteAdding should not be used concurrently.
                    CancelWaitingProducers();
                    return;
                }
                spinner.SpinOnce(sleep1Threshold: -1);
            }
        }
 
        /// <summary>Cancels the semaphores.</summary>
        private void CancelWaitingConsumers()
        {
            _consumersCancellationTokenSource.Cancel();
        }
 
        private void CancelWaitingProducers()
        {
            _producersCancellationTokenSource.Cancel();
        }
 
 
        /// <summary>
        /// Releases resources used by the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        /// <summary>
        /// Releases resources used by the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance.
        /// </summary>
        /// <param name="disposing">Whether being disposed explicitly (true) or due to a finalizer (false).</param>
        protected virtual void Dispose(bool disposing)
        {
            if (!_isDisposed)
            {
                _freeNodes?.Dispose();
                _occupiedNodes.Dispose();
 
                _isDisposed = true;
            }
        }
 
        /// <summary>Copies the items from the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance into a new array.</summary>
        /// <returns>An array containing copies of the elements of the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <remarks>
        /// The copied elements are not removed from the collection.
        /// </remarks>
        public T[] ToArray()
        {
            CheckDisposed();
            return _collection.ToArray();
        }
 
        /// <summary>Copies all of the items in the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance
        /// to a compatible one-dimensional array, starting at the specified index of the target array.
        /// </summary>
        /// <param name="array">The one-dimensional array that is the destination of the elements copied from
        /// the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
        /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
        /// <exception cref="System.ArgumentNullException">The <paramref name="array"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
        /// than the length of the <paramref name="array"/>.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public void CopyTo(T[] array, int index)
        {
            ((ICollection)this).CopyTo(array, index);
        }
 
        /// <summary>Copies all of the items in the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance
        /// to a compatible one-dimensional array, starting at the specified index of the target array.
        /// </summary>
        /// <param name="array">The one-dimensional array that is the destination of the elements copied from
        /// the <see cref="System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
        /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
        /// <exception cref="System.ArgumentNullException">The <paramref name="array"/> argument is
        /// null.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
        /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
        /// than the length of the <paramref name="array"/>, the array is multidimensional, or the type parameter for the collection
        /// cannot be cast automatically to the type of the destination array.</exception>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        void ICollection.CopyTo(Array array, int index)
        {
            CheckDisposed();
 
            //We don't call _collection.CopyTo() directly because we rely on Array.Copy method to customize
            //all array exceptions.
            T[] collectionSnapShot = _collection.ToArray();
 
            try
            {
                Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
            }
            catch (ArgumentNullException)
            {
                throw new ArgumentNullException(nameof(array));
            }
            catch (ArgumentOutOfRangeException)
            {
                throw new ArgumentOutOfRangeException(nameof(index), index, SR.BlockingCollection_CopyTo_NonNegative);
            }
            catch (ArgumentException)
            {
                throw new ArgumentException(SR.Collection_CopyTo_TooManyElems, nameof(index));
            }
            catch (RankException)
            {
                throw new ArgumentException(SR.BlockingCollection_CopyTo_MultiDim, nameof(array));
            }
            catch (InvalidCastException)
            {
                throw new ArgumentException(SR.BlockingCollection_CopyTo_IncorrectType, nameof(array));
            }
            catch (ArrayTypeMismatchException)
            {
                throw new ArgumentException(SR.BlockingCollection_CopyTo_IncorrectType, nameof(array));
            }
        }
 
        /// <summary>Provides a consuming <see cref="System.Collections.Generic.IEnumerable{T}"/> for items in the collection.</summary>
        /// <returns>An <see cref="System.Collections.Generic.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        public IEnumerable<T> GetConsumingEnumerable()
        {
            return GetConsumingEnumerable(CancellationToken.None);
        }
 
        /// <summary>Provides a consuming <see cref="System.Collections.Generic.IEnumerable{T}"/> for items in the collection.
        /// Calling MoveNext on the returned enumerable will block if there is no data available, or will
        /// throw an <see cref="System.OperationCanceledException"/> if the <see cref="CancellationToken"/> is canceled.
        /// </summary>
        /// <param name="cancellationToken">A cancellation token to observe.</param>
        /// <returns>An <see cref="System.Collections.Generic.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
        public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
        {
            if (IsCompleted)
            {
                yield break;
            }
 
            using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _consumersCancellationTokenSource.Token);
            while (TryTakeWithNoTimeValidation(out T? item, Timeout.Infinite, cancellationToken, linkedTokenSource))
            {
                yield return item;
            }
        }
 
        /// <summary>Provides an <see cref="System.Collections.Generic.IEnumerator{T}"/> for items in the collection.</summary>
        /// <returns>An <see cref="System.Collections.Generic.IEnumerator{T}"/> for the items in the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            CheckDisposed();
            return _collection.GetEnumerator();
        }
 
        /// <summary>Provides an <see cref="System.Collections.IEnumerator"/> for items in the collection.</summary>
        /// <returns>An <see cref="System.Collections.IEnumerator"/> for the items in the collection.</returns>
        /// <exception cref="System.ObjectDisposedException">The <see
        /// cref="System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }
 
        /// <summary>Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
        /// and TryTakeFromAny().</summary>
        /// <param name="collections">The collections to/from which an item should be added/removed.</param>
        /// <param name="isAddOperation">Indicates whether this method is called to Add or Take.</param>
        /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
        /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
        /// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
        /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
        private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)
        {
            ArgumentNullException.ThrowIfNull(collections);
 
            if (collections.Length < 1)
            {
                throw new ArgumentException(
                    SR.BlockingCollection_ValidateCollectionsArray_ZeroSize, nameof(collections));
            }
            if ((collections.Length > 63) ||
                ((collections.Length == 63) && (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA)))
            {
                //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, as we reserve one for CancellationToken
                throw new ArgumentOutOfRangeException(
                    nameof(collections), SR.BlockingCollection_ValidateCollectionsArray_LargeSize);
            }
 
            for (int i = 0; i < collections.Length; ++i)
            {
                if (collections[i] == null)
                {
                    throw new ArgumentException(
                        SR.BlockingCollection_ValidateCollectionsArray_NullElems, nameof(collections));
                }
 
                if (collections[i]._isDisposed)
                    throw new ObjectDisposedException(
                        nameof(collections), SR.BlockingCollection_ValidateCollectionsArray_DispElems);
 
                if (isAddOperation && collections[i].IsAddingCompleted)
                {
                    throw new ArgumentException(
                        SR.BlockingCollection_CantAddAnyWhenCompleted, nameof(collections));
                }
            }
        }
 
        // ---------
        // Private Helpers.
        /// <summary>Centralizes the logic of validating the timeout input argument.</summary>
        /// <param name="timeout">The TimeSpan to wait for to successfully complete an operation on the collection.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">If the number of milliseconds represented by the timeout
        /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite</exception>
        private static void ValidateTimeout(TimeSpan timeout)
        {
            long totalMilliseconds = (long)timeout.TotalMilliseconds;
            if ((totalMilliseconds < 0 || totalMilliseconds > int.MaxValue) && (totalMilliseconds != Timeout.Infinite))
            {
                throw new ArgumentOutOfRangeException(nameof(timeout), timeout,
                    SR.Format(CultureInfo.InvariantCulture, SR.BlockingCollection_TimeoutInvalid, int.MaxValue));
            }
        }
 
        /// <summary>Centralizes the logic of validating the millisecondsTimeout input argument.</summary>
        /// <param name="millisecondsTimeout">The number of milliseconds to wait for to successfully complete an
        /// operation on the collection.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">If the number of milliseconds is less than 0 and not
        /// equal to Timeout.Infinite.</exception>
        private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
        {
            if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
            {
                throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout), millisecondsTimeout,
                    SR.Format(CultureInfo.InvariantCulture, SR.BlockingCollection_TimeoutInvalid, int.MaxValue));
            }
        }
 
        /// <summary>Throws a System.ObjectDisposedException if the collection was disposed</summary>
        /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
        private void CheckDisposed()
        {
            ObjectDisposedException.ThrowIf(_isDisposed, this);
        }
    }
 
    /// <summary>A debugger view of the blocking collection that makes it simple to browse the
    /// collection's contents at a point in time.</summary>
    /// <typeparam name="T">The type of element that the BlockingCollection will hold.</typeparam>
    internal sealed class BlockingCollectionDebugView<T>
    {
        private readonly BlockingCollection<T> _blockingCollection; // The collection being viewed.
 
        /// <summary>Constructs a new debugger view object for the provided blocking collection object.</summary>
        /// <param name="collection">A blocking collection to browse in the debugger.</param>
        public BlockingCollectionDebugView(BlockingCollection<T> collection)
        {
            ArgumentNullException.ThrowIfNull(collection);
 
            _blockingCollection = collection;
        }
 
        /// <summary>Returns a snapshot of the underlying collection's elements.</summary>
        [UnsupportedOSPlatform("browser")]
        [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
        public T[] Items
        {
            get
            {
                return _blockingCollection.ToArray();
            }
        }
    }
}