File: System\Threading\Channels\BoundedChannel.cs
Web Access
Project: src\src\libraries\System.Threading.Channels\src\System.Threading.Channels.csproj (System.Threading.Channels)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
 
namespace System.Threading.Channels
{
    /// <summary>Provides a channel with a bounded capacity.</summary>
    [DebuggerDisplay("Items = {ItemsCountForDebugger}, Capacity = {_bufferedCapacity}, Mode = {_mode}, Closed = {ChannelIsClosedForDebugger}")]
    [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
    internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
    {
        /// <summary>The mode used when the channel hits its bound.</summary>
        private readonly BoundedChannelFullMode _mode;
        /// <summary>The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel.</summary>
        private readonly Action<T>? _itemDropped;
        /// <summary>Task signaled when the channel has completed.</summary>
        private readonly TaskCompletionSource _completion;
        /// <summary>The maximum capacity of the channel.</summary>
        private readonly int _bufferedCapacity;
        /// <summary>Items currently stored in the channel waiting to be read.</summary>
        private readonly Deque<T> _items = new Deque<T>();
        /// <summary>Readers waiting to read from the channel.</summary>
        private readonly Deque<AsyncOperation<T>> _blockedReaders = new Deque<AsyncOperation<T>>();
        /// <summary>Writers waiting to write to the channel.</summary>
        private readonly Deque<VoidAsyncOperationWithData<T>> _blockedWriters = new Deque<VoidAsyncOperationWithData<T>>();
        /// <summary>Linked list of WaitToReadAsync waiters.</summary>
        private AsyncOperation<bool>? _waitingReadersTail;
        /// <summary>Linked list of WaitToWriteAsync waiters.</summary>
        private AsyncOperation<bool>? _waitingWritersTail;
        /// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
        private readonly bool _runContinuationsAsynchronously;
        /// <summary>Set to non-null once Complete has been called.</summary>
        private Exception? _doneWriting;
        /// <summary>Gets an object used to synchronize all state on the instance.</summary>
        private object SyncObj => _items;
 
        /// <summary>Initializes the <see cref="BoundedChannel{T}"/>.</summary>
        /// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
        /// <param name="mode">The mode used when writing to a full channel.</param>
        /// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
        /// <param name="itemDropped">Delegate that will be invoked when an item is dropped from the channel. See <see cref="BoundedChannelFullMode"/>.</param>
        internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action<T>? itemDropped)
        {
            Debug.Assert(bufferedCapacity > 0);
            _bufferedCapacity = bufferedCapacity;
            _mode = mode;
            _runContinuationsAsynchronously = runContinuationsAsynchronously;
            _itemDropped = itemDropped;
            _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
            Reader = new BoundedChannelReader(this);
            Writer = new BoundedChannelWriter(this);
        }
 
        [DebuggerDisplay("Items = {ItemsCountForDebugger}")]
        [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
        private sealed class BoundedChannelReader : ChannelReader<T>, IDebugEnumerable<T>
        {
            internal readonly BoundedChannel<T> _parent;
            private readonly AsyncOperation<T> _readerSingleton;
            private readonly AsyncOperation<bool> _waiterSingleton;
 
            internal BoundedChannelReader(BoundedChannel<T> parent)
            {
                _parent = parent;
                _readerSingleton = new AsyncOperation<T>(parent._runContinuationsAsynchronously, pooled: true);
                _waiterSingleton = new AsyncOperation<bool>(parent._runContinuationsAsynchronously, pooled: true);
            }
 
            public override Task Completion => _parent._completion.Task;
 
            public override bool CanCount => true;
 
            public override bool CanPeek => true;
 
            public override int Count
            {
                get
                {
                    BoundedChannel<T> parent = _parent;
                    lock (parent.SyncObj)
                    {
                        parent.AssertInvariants();
                        return parent._items.Count;
                    }
                }
            }
 
            /// <summary>Gets the number of items in the channel. This should only be used by the debugger.</summary>
            /// <remarks>
            /// Unlike <see cref="Count"/>, avoids locking so as to not block the debugger if another suspended thread is holding the lock.
            /// Hence, this must only be used from the debugger in a serialized context.
            /// </remarks>
            private int ItemsCountForDebugger => _parent._items.Count;
 
            public override bool TryRead([MaybeNullWhen(false)] out T item)
            {
                BoundedChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // Get an item if there is one.
                    if (!parent._items.IsEmpty)
                    {
                        item = DequeueItemAndPostProcess();
                        return true;
                    }
                }
 
                item = default;
                return false;
            }
 
            public override bool TryPeek([MaybeNullWhen(false)] out T item)
            {
                BoundedChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // Peek at an item if there is one.
                    if (!parent._items.IsEmpty)
                    {
                        item = parent._items.PeekHead();
                        return true;
                    }
                }
 
                item = default;
                return false;
            }
 
            public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
                }
 
                BoundedChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If there are any items, hand one back.
                    if (!parent._items.IsEmpty)
                    {
                        return new ValueTask<T>(DequeueItemAndPostProcess());
                    }
 
                    // There weren't any items.  If we're done writing so that there
                    // will never be more items, fail.
                    if (parent._doneWriting != null)
                    {
                        return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
                    }
 
                    // If we're able to use the singleton reader, do so.
                    if (!cancellationToken.CanBeCanceled)
                    {
                        AsyncOperation<T> singleton = _readerSingleton;
                        if (singleton.TryOwnAndReset())
                        {
                            parent._blockedReaders.EnqueueTail(singleton);
                            return singleton.ValueTaskOfT;
                        }
                    }
 
                    // Otherwise, queue a reader.  Note that in addition to checking whether synchronous continuations were requested,
                    // we also check whether the supplied cancellation token can be canceled.  The writer calls UnregisterCancellation
                    // while holding the lock, and if a callback needs to be unregistered and is currently running, it needs to wait
                    // for that callback to complete so that the subsequent code knows it won't be contending with another thread
                    // trying to complete the operation.  However, if we allowed a synchronous continuation from this operation, that
                    // cancellation callback could end up running arbitrary code, including code that called back into the reader or
                    // writer and tried to take the same lock held by the thread running UnregisterCancellation... deadlock.  As such,
                    // we only allow synchronous continuations here if both a) the caller requested it and the token isn't cancelable.
                    var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
                    parent._blockedReaders.EnqueueTail(reader);
                    return reader.ValueTaskOfT;
                }
            }
 
            public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
                }
 
                BoundedChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If there are any items available, a read is possible.
                    if (!parent._items.IsEmpty)
                    {
                        return new ValueTask<bool>(true);
                    }
 
                    // There were no items available, so if we're done writing, a read will never be possible.
                    if (parent._doneWriting != null)
                    {
                        return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
                            new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) :
                            default;
                    }
 
                    // There were no items available, but there could be in the future, so ensure
                    // there's a blocked reader task and return it.
 
                    // If we're able to use the singleton waiter, do so.
                    if (!cancellationToken.CanBeCanceled)
                    {
                        AsyncOperation<bool> singleton = _waiterSingleton;
                        if (singleton.TryOwnAndReset())
                        {
                            ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton);
                            return singleton.ValueTaskOfT;
                        }
                    }
 
                    // Otherwise, queue a reader.  Note that in addition to checking whether synchronous continuations were requested,
                    // we also check whether the supplied cancellation token can be canceled.  The writer calls UnregisterCancellation
                    // while holding the lock, and if a callback needs to be unregistered and is currently running, it needs to wait
                    // for that callback to complete so that the subsequent code knows it won't be contending with another thread
                    // trying to complete the operation.  However, if we allowed a synchronous continuation from this operation, that
                    // cancellation callback could end up running arbitrary code, including code that called back into the reader or
                    // writer and tried to take the same lock held by the thread running UnregisterCancellation... deadlock.  As such,
                    // we only allow synchronous continuations here if both a) the caller requested it and the token isn't cancelable.
                    var waiter = new AsyncOperation<bool>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
                    ChannelUtilities.QueueWaiter(ref _parent._waitingReadersTail, waiter);
                    return waiter.ValueTaskOfT;
                }
            }
 
            /// <summary>Dequeues an item, and then fixes up our state around writers and completion.</summary>
            /// <returns>The dequeued item.</returns>
            private T DequeueItemAndPostProcess()
            {
                BoundedChannel<T> parent = _parent;
                Debug.Assert(Monitor.IsEntered(parent.SyncObj));
 
                // Dequeue an item.
                T item = parent._items.DequeueHead();
 
                if (parent._doneWriting != null)
                {
                    // We're done writing, so if we're now empty, complete the channel.
                    if (parent._items.IsEmpty)
                    {
                        ChannelUtilities.Complete(parent._completion, parent._doneWriting);
                    }
                }
                else
                {
                    // If there are any writers blocked, there's now room for at least one
                    // to be promoted to have its item moved into the items queue.  We need
                    // to loop while trying to complete the writer in order to find one that
                    // hasn't yet been canceled (canceled writers transition to canceled but
                    // remain in the physical queue).
                    //
                    // (It's possible for _doneWriting to be non-null due to Complete
                    // having been called but for there to still be blocked/waiting writers.
                    // This is a temporary condition, after which Complete has set _doneWriting
                    // and then exited the lock; at that point it'll proceed to clean this up,
                    // so we just ignore them.)
 
                    while (!parent._blockedWriters.IsEmpty)
                    {
                        VoidAsyncOperationWithData<T> w = parent._blockedWriters.DequeueHead();
                        if (w.TrySetResult(default))
                        {
                            parent._items.EnqueueTail(w.Item!);
                            return item;
                        }
                    }
 
                    // There was no blocked writer, so see if there's a WaitToWriteAsync
                    // we should wake up.
                    ChannelUtilities.WakeUpWaiters(ref parent._waitingWritersTail, result: true);
                }
 
                // Return the item
                return item;
            }
 
            /// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary>
            IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _parent._items.GetEnumerator();
        }
 
        [DebuggerDisplay("Items = {ItemsCountForDebugger}, Capacity = {CapacityForDebugger}")]
        [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
        private sealed class BoundedChannelWriter : ChannelWriter<T>, IDebugEnumerable<T>
        {
            internal readonly BoundedChannel<T> _parent;
            private readonly VoidAsyncOperationWithData<T> _writerSingleton;
            private readonly AsyncOperation<bool> _waiterSingleton;
 
            internal BoundedChannelWriter(BoundedChannel<T> parent)
            {
                _parent = parent;
                _writerSingleton = new VoidAsyncOperationWithData<T>(runContinuationsAsynchronously: true, pooled: true);
                _waiterSingleton = new AsyncOperation<bool>(runContinuationsAsynchronously: true, pooled: true);
            }
 
            public override bool TryComplete(Exception? error)
            {
                BoundedChannel<T> parent = _parent;
                bool completeTask;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we've already marked the channel as completed, bail.
                    if (parent._doneWriting != null)
                    {
                        return false;
                    }
 
                    // Mark that we're done writing.
                    parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
                    completeTask = parent._items.IsEmpty;
                }
 
                // If there are no items in the queue, complete the channel's task,
                // as no more data can possibly arrive at this point.  We do this outside
                // of the lock in case we'll be running synchronous completions, and we
                // do it before completing blocked/waiting readers, so that when they
                // wake up they'll see the task as being completed.
                if (completeTask)
                {
                    ChannelUtilities.Complete(parent._completion, error);
                }
 
                // At this point, _blockedReaders/Writers and _waitingReaders/Writers will not be mutated:
                // they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null.
                // We also know that only one thread (this one) will ever get here, as only that thread
                // will be the one to transition from _doneWriting false to true.  As such, we can
                // freely manipulate them without any concurrency concerns.
                ChannelUtilities.FailOperations<AsyncOperation<T>, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error));
                ChannelUtilities.FailOperations<VoidAsyncOperationWithData<T>, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error));
                ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error);
                ChannelUtilities.WakeUpWaiters(ref parent._waitingWritersTail, result: false, error: error);
 
                // Successfully transitioned to completed.
                return true;
            }
 
            public override bool TryWrite(T item)
            {
                AsyncOperation<T>? blockedReader = null;
                AsyncOperation<bool>? waitingReadersTail = null;
 
                BoundedChannel<T> parent = _parent;
 
                bool releaseLock = false;
                try
                {
                    Monitor.Enter(parent.SyncObj, ref releaseLock);
 
                    parent.AssertInvariants();
 
                    // If we're done writing, nothing more to do.
                    if (parent._doneWriting != null)
                    {
                        return false;
                    }
 
                    // Get the number of items in the channel currently.
                    int count = parent._items.Count;
 
                    if (count == 0)
                    {
                        // There are no items in the channel, which means we may have blocked/waiting readers.
 
                        // If there are any blocked readers, find one that's not canceled
                        // and store it to complete outside of the lock, in case it has
                        // continuations that'll run synchronously
                        while (!parent._blockedReaders.IsEmpty)
                        {
                            AsyncOperation<T> r = parent._blockedReaders.DequeueHead();
                            if (r.UnregisterCancellation()) // ensure that once we grab it, we own its completion
                            {
                                blockedReader = r;
                                break;
                            }
                        }
 
                        if (blockedReader == null)
                        {
                            // If there wasn't a blocked reader, then store the item. If no one's waiting
                            // to be notified about a 0-to-1 transition, we're done.
                            parent._items.EnqueueTail(item);
                            waitingReadersTail = parent._waitingReadersTail;
                            if (waitingReadersTail == null)
                            {
                                return true;
                            }
                            parent._waitingReadersTail = null;
                        }
                    }
                    else if (count < parent._bufferedCapacity)
                    {
                        // There's room in the channel.  Since we're not transitioning from 0-to-1 and
                        // since there's room, we can simply store the item and exit without having to
                        // worry about blocked/waiting readers.
                        parent._items.EnqueueTail(item);
                        return true;
                    }
                    else if (parent._mode == BoundedChannelFullMode.Wait)
                    {
                        // The channel is full and we're in a wait mode.
                        // Simply exit and let the caller know we didn't write the data.
                        return false;
                    }
                    else if (parent._mode == BoundedChannelFullMode.DropWrite)
                    {
                        // The channel is full.  Just ignore the item being added
                        // but say we added it.
                        Monitor.Exit(parent.SyncObj);
                        releaseLock = false;
                        parent._itemDropped?.Invoke(item);
                        return true;
                    }
                    else
                    {
                        // The channel is full, and we're in a dropping mode.
                        // Drop either the oldest or the newest
                        T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
                            parent._items.DequeueTail() :
                            parent._items.DequeueHead();
 
                        parent._items.EnqueueTail(item);
 
                        Monitor.Exit(parent.SyncObj);
                        releaseLock = false;
                        parent._itemDropped?.Invoke(droppedItem);
 
                        return true;
                    }
                }
                finally
                {
                    if (releaseLock)
                    {
                        Monitor.Exit(parent.SyncObj);
                    }
                }
 
                // We either wrote the item already, or we're transferring it to the blocked reader we grabbed.
                if (blockedReader != null)
                {
                    Debug.Assert(waitingReadersTail == null, "Shouldn't have any waiters to wake up");
 
                    // Transfer the written item to the blocked reader.
                    bool success = blockedReader.TrySetResult(item);
                    Debug.Assert(success, "We should always be able to complete the reader.");
                }
                else
                {
                    // We stored an item bringing the count up from 0 to 1.  Alert
                    // any waiting readers that there may be something for them to consume.
                    // Since we're no longer holding the lock, it's possible we'll end up
                    // waking readers that have since come in.
                    ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true);
                }
 
                return true;
            }
 
            public override ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
                }
 
                BoundedChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we're done writing, no writes will ever succeed.
                    if (parent._doneWriting != null)
                    {
                        return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
                            new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) :
                            default;
                    }
 
                    // If there's space to write, a write is possible.
                    // And if the mode involves dropping/ignoring, we can always write, as even if it's
                    // full we'll just drop an element to make room.
                    if (parent._items.Count < parent._bufferedCapacity || parent._mode != BoundedChannelFullMode.Wait)
                    {
                        return new ValueTask<bool>(true);
                    }
 
                    // We're still allowed to write, but there's no space, so ensure a waiter is queued and return it.
 
                    // If we're able to use the singleton waiter, do so.
                    if (!cancellationToken.CanBeCanceled)
                    {
                        AsyncOperation<bool> singleton = _waiterSingleton;
                        if (singleton.TryOwnAndReset())
                        {
                            ChannelUtilities.QueueWaiter(ref parent._waitingWritersTail, singleton);
                            return singleton.ValueTaskOfT;
                        }
                    }
 
                    // Otherwise, queue a waiter.
                    var waiter = new AsyncOperation<bool>(runContinuationsAsynchronously: true, cancellationToken);
                    ChannelUtilities.QueueWaiter(ref parent._waitingWritersTail, waiter);
                    return waiter.ValueTaskOfT;
                }
            }
 
            public override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask(Task.FromCanceled(cancellationToken));
                }
 
                AsyncOperation<T>? blockedReader = null;
                AsyncOperation<bool>? waitingReadersTail = null;
 
                BoundedChannel<T> parent = _parent;
 
                bool releaseLock = false;
                try
                {
                    Monitor.Enter(parent.SyncObj, ref releaseLock);
 
                    parent.AssertInvariants();
 
                    // If we're done writing, trying to write is an error.
                    if (parent._doneWriting != null)
                    {
                        return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting)));
                    }
 
                    // Get the number of items in the channel currently.
                    int count = parent._items.Count;
 
                    if (count == 0)
                    {
                        // There are no items in the channel, which means we may have blocked/waiting readers.
 
                        // If there are any blocked readers, find one that's not canceled
                        // and store it to complete outside of the lock, in case it has
                        // continuations that'll run synchronously
                        while (!parent._blockedReaders.IsEmpty)
                        {
                            AsyncOperation<T> r = parent._blockedReaders.DequeueHead();
                            if (r.UnregisterCancellation()) // ensure that once we grab it, we own its completion
                            {
                                blockedReader = r;
                                break;
                            }
                        }
 
                        if (blockedReader == null)
                        {
                            // If there wasn't a blocked reader, then store the item. If no one's waiting
                            // to be notified about a 0-to-1 transition, we're done.
                            parent._items.EnqueueTail(item);
                            waitingReadersTail = parent._waitingReadersTail;
                            if (waitingReadersTail == null)
                            {
                                return default;
                            }
                            parent._waitingReadersTail = null;
                        }
                    }
                    else if (count < parent._bufferedCapacity)
                    {
                        // There's room in the channel.  Since we're not transitioning from 0-to-1 and
                        // since there's room, we can simply store the item and exit without having to
                        // worry about blocked/waiting readers.
                        parent._items.EnqueueTail(item);
                        return default;
                    }
                    else if (parent._mode == BoundedChannelFullMode.Wait)
                    {
                        // The channel is full and we're in a wait mode.  We need to queue a writer.
 
                        // If we're able to use the singleton writer, do so.
                        if (!cancellationToken.CanBeCanceled)
                        {
                            VoidAsyncOperationWithData<T> singleton = _writerSingleton;
                            if (singleton.TryOwnAndReset())
                            {
                                singleton.Item = item;
                                parent._blockedWriters.EnqueueTail(singleton);
                                return singleton.ValueTask;
                            }
                        }
 
                        // Otherwise, queue a new writer.
                        var writer = new VoidAsyncOperationWithData<T>(runContinuationsAsynchronously: true, cancellationToken);
                        writer.Item = item;
                        parent._blockedWriters.EnqueueTail(writer);
                        return writer.ValueTask;
                    }
                    else if (parent._mode == BoundedChannelFullMode.DropWrite)
                    {
                        // The channel is full and we're in ignore mode.
                        // Ignore the item but say we accepted it.
                        Monitor.Exit(parent.SyncObj);
                        releaseLock = false;
                        parent._itemDropped?.Invoke(item);
                        return default;
                    }
                    else
                    {
                        // The channel is full, and we're in a dropping mode.
                        // Drop either the oldest or the newest and write the new item.
                        T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
                            parent._items.DequeueTail() :
                            parent._items.DequeueHead();
 
                        parent._items.EnqueueTail(item);
 
                        Monitor.Exit(parent.SyncObj);
                        releaseLock = false;
                        parent._itemDropped?.Invoke(droppedItem);
 
                        return default;
                    }
                }
                finally
                {
                    if (releaseLock)
                    {
                        Monitor.Exit(parent.SyncObj);
                    }
                }
 
                // We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
                if (blockedReader != null)
                {
                    // Transfer the written item to the blocked reader.
                    bool success = blockedReader.TrySetResult(item);
                    Debug.Assert(success, "We should always be able to complete the reader.");
                }
                else
                {
                    // We stored an item bringing the count up from 0 to 1.  Alert
                    // any waiting readers that there may be something for them to consume.
                    // Since we're no longer holding the lock, it's possible we'll end up
                    // waking readers that have since come in.
                    ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true);
                }
 
                return default;
            }
 
            /// <summary>Gets the number of items in the channel. This should only be used by the debugger.</summary>
            private int ItemsCountForDebugger => _parent._items.Count;
 
            /// <summary>Gets the capacity of the channel. This should only be used by the debugger.</summary>
            private int CapacityForDebugger => _parent._bufferedCapacity;
 
            /// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary>
            IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _parent._items.GetEnumerator();
        }
 
        [Conditional("DEBUG")]
        private void AssertInvariants()
        {
            Debug.Assert(SyncObj != null, "The sync obj must not be null.");
            Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
 
            if (!_items.IsEmpty)
            {
                Debug.Assert(_blockedReaders.IsEmpty, "There are items available, so there shouldn't be any blocked readers.");
                Debug.Assert(_waitingReadersTail == null, "There are items available, so there shouldn't be any waiting readers.");
            }
            if (_items.Count < _bufferedCapacity)
            {
                Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers.");
                Debug.Assert(_waitingWritersTail == null, "There's space available, so there shouldn't be any waiting writers.");
            }
            if (!_blockedReaders.IsEmpty)
            {
                Debug.Assert(_items.IsEmpty, "There shouldn't be queued items if there's a blocked reader.");
                Debug.Assert(_blockedWriters.IsEmpty, "There shouldn't be any blocked writer if there's a blocked reader.");
            }
            if (!_blockedWriters.IsEmpty)
            {
                Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer.");
                Debug.Assert(_blockedReaders.IsEmpty, "There shouldn't be any blocked readers if there's a blocked writer.");
            }
            if (_completion.Task.IsCompleted)
            {
                Debug.Assert(_doneWriting != null, "We can only complete if we're done writing.");
            }
        }
 
        /// <summary>Gets the number of items in the channel.  This should only be used by the debugger.</summary>
        private int ItemsCountForDebugger => _items.Count;
 
        /// <summary>Report if the channel is closed or not. This should only be used by the debugger.</summary>
        private bool ChannelIsClosedForDebugger => _doneWriting != null;
 
        /// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary>
        IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _items.GetEnumerator();
    }
}