File: System\Threading\Channels\RendezvousChannel.cs
Web Access
Project: src\src\libraries\System.Threading.Channels\src\System.Threading.Channels.csproj (System.Threading.Channels)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
 
namespace System.Threading.Channels
{
    /// <summary>Provides an unbuffered channel where readers and writers must directly hand off to each other.</summary>
    [DebuggerDisplay("{DebuggerDisplay,nq}")]
    internal sealed class RendezvousChannel<T> : Channel<T>
    {
        /// <summary>Whether to suceed writes immediately even when there's no rendezvousing reader.</summary>
        private readonly bool _dropWrites;
 
        /// <summary>The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel.</summary>
        private readonly Action<T>? _itemDropped;
 
        /// <summary>Task signaled when the channel has completed.</summary>
        private readonly TaskCompletionSource _completion;
 
        /// <summary>Head of linked list of blocked ReadAsync calls.</summary>
        private BlockedReadAsyncOperation<T>? _blockedReadersHead;
 
        /// <summary>Head of linked list of blocked WriteAsync calls.</summary>
        private BlockedWriteAsyncOperation<T>? _blockedWritersHead;
 
        /// <summary>Head of linked list of waiting WaitToReadAsync calls.</summary>
        private WaitingReadAsyncOperation? _waitingReadersHead;
 
        /// <summary>Head of linked list of waiting WaitToWriteAsync calls.</summary>
        private WaitingWriteAsyncOperation? _waitingWritersHead;
 
        /// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
        private readonly bool _runContinuationsAsynchronously;
 
        /// <summary>Set to non-null once Complete has been called.</summary>
        private Exception? _doneWriting;
 
        /// <summary>Initializes the <see cref="RendezvousChannel{T}"/>.</summary>
        /// <param name="mode">The mode used when writing to a full channel.</param>
        /// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
        /// <param name="itemDropped">Delegate that will be invoked when an item is dropped from the channel. See <see cref="BoundedChannelFullMode"/>.</param>
        internal RendezvousChannel(BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action<T>? itemDropped)
        {
            _dropWrites = mode is not BoundedChannelFullMode.Wait;
 
            _runContinuationsAsynchronously = runContinuationsAsynchronously;
            _itemDropped = itemDropped;
            _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
 
            Reader = new RendezvousChannelReader(this);
            Writer = new RendezvousChannelWriter(this);
        }
 
        [DebuggerDisplay("{DebuggerDisplay,nq}")]
        private sealed class RendezvousChannelReader : ChannelReader<T>
        {
            internal readonly RendezvousChannel<T> _parent;
            private readonly BlockedReadAsyncOperation<T> _readerSingleton;
            private readonly WaitingReadAsyncOperation _waiterSingleton;
 
            internal RendezvousChannelReader(RendezvousChannel<T> parent)
            {
                _parent = parent;
                _readerSingleton = new BlockedReadAsyncOperation<T>(parent._runContinuationsAsynchronously, pooled: true);
                _waiterSingleton = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
            }
 
            public override Task Completion => _parent._completion.Task;
 
            public override bool CanCount => true;
 
            public override bool CanPeek => true;
 
            public override int Count => 0;
 
            public override bool TryRead([MaybeNullWhen(false)] out T item)
            {
                RendezvousChannel<T> parent = _parent;
 
                // Reserve a blocked writer if one is available.
                BlockedWriteAsyncOperation<T>? blockedWriter = null;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    if (parent._doneWriting is null)
                    {
                        blockedWriter = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedWritersHead);
                    }
                }
 
                // If we got one, transfer its item to the read and complete successfully.
                if (blockedWriter is not null)
                {
                    item = blockedWriter.Item!;
                    blockedWriter.DangerousSetResult(default);
                    return true;
                }
 
                item = default;
                return false;
            }
 
            public override bool TryPeek([MaybeNullWhen(false)] out T item)
            {
                RendezvousChannel<T> parent = _parent;
 
                // Peek at a blocked writer if one is available.
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    if (parent._doneWriting is null &&
                        parent._blockedWritersHead is { } blockedWriter)
                    {
                        item = blockedWriter.Item!;
                        return true;
                    }
                }
 
                item = default;
                return false;
            }
 
            public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
            {
                RendezvousChannel<T> parent = _parent;
 
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
                }
 
                BlockedReadAsyncOperation<T>? reader = null;
                WaitingWriteAsyncOperation? waitingWriters = null;
                BlockedWriteAsyncOperation<T>? blockedWriter = null;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we're done writing so that there will never be more items, fail.
                    if (parent._doneWriting is not null)
                    {
                        return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
                    }
 
                    // Reserve a blocked writer if one is available.
                    blockedWriter = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedWritersHead);
 
                    // If we couldn't get one, create a waiting reader, and reserve any waiting writers to alert.
                    if (blockedWriter is null)
                    {
                        reader =
                            !cancellationToken.CanBeCanceled && _readerSingleton.TryOwnAndReset() ? _readerSingleton :
                            new(parent._runContinuationsAsynchronously, cancellationToken, cancellationCallback: _parent.CancellationCallbackDelegate);
                        ChannelUtilities.Enqueue(ref parent._blockedReadersHead, reader);
 
                        waitingWriters = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingWritersHead);
                    }
                }
 
                // Either complete the reserved blocked writer, transferring its item to the read,
                // or return the waiting reader task, also alerting any waiting writers.
                ValueTask<T> result;
                if (blockedWriter is not null)
                {
                    Debug.Assert(reader is null);
                    Debug.Assert(waitingWriters is null);
                    result = new(blockedWriter.Item!);
                    blockedWriter.DangerousSetResult(default);
                }
                else
                {
                    Debug.Assert(reader is not null);
                    ChannelUtilities.DangerousSetOperations(waitingWriters, result: true);
                    result = reader.ValueTaskOfT;
                }
 
                return result;
            }
 
            public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
                }
 
                RendezvousChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we're done writing, a read will never be possible.
                    if (parent._doneWriting is not null)
                    {
                        return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
                            new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) :
                            default;
                    }
 
                    // If there are any writers waiting, a read is possible.
                    if (parent._blockedWritersHead is not null)
                    {
                        return new ValueTask<bool>(true);
                    }
 
                    // Register a waiting reader task.
                    WaitingReadAsyncOperation waiter =
                        !cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset() ? _waiterSingleton :
                        new(parent._runContinuationsAsynchronously, cancellationToken, cancellationCallback: _parent.CancellationCallbackDelegate);
                    ChannelUtilities.Enqueue(ref parent._waitingReadersHead, waiter);
                    return waiter.ValueTaskOfT;
                }
            }
 
            internal string DebuggerDisplay
            {
                get
                {
                    long blockedReaderCount, waitingReaderCount;
                    lock (_parent.SyncObj)
                    {
                        blockedReaderCount = ChannelUtilities.CountOperations(_parent._blockedReadersHead);
                        waitingReaderCount = ChannelUtilities.CountOperations(_parent._waitingReadersHead);
                    }
 
                    return $"ReadAsync={blockedReaderCount}, WaitToReadAsync={waitingReaderCount}";
                }
            }
        }
 
        [DebuggerDisplay("{DebuggerDisplay,nq}")]
        private sealed class RendezvousChannelWriter : ChannelWriter<T>
        {
            internal readonly RendezvousChannel<T> _parent;
            private readonly BlockedWriteAsyncOperation<T> _writerSingleton;
            private readonly WaitingWriteAsyncOperation _waiterSingleton;
 
            internal RendezvousChannelWriter(RendezvousChannel<T> parent)
            {
                _parent = parent;
                _writerSingleton = new BlockedWriteAsyncOperation<T>(runContinuationsAsynchronously: true, pooled: true);
                _waiterSingleton = new WaitingWriteAsyncOperation(runContinuationsAsynchronously: true, pooled: true);
            }
 
            public override bool TryComplete(Exception? error)
            {
                RendezvousChannel<T> parent = _parent;
 
                BlockedReadAsyncOperation<T>? blockedReadersHead;
                BlockedWriteAsyncOperation<T>? blockedWritersHead;
                WaitingReadAsyncOperation? waitingReadersHead;
                WaitingWriteAsyncOperation? waitingWritersHead;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we've already marked the channel as completed, bail.
                    if (parent._doneWriting is not null)
                    {
                        return false;
                    }
 
                    // Mark that we're done writing.
                    parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
 
                    // Snag the queues while holding the lock, so that we don't need to worry
                    // about concurrent mutation, such as from cancellation of pending operations.
                    blockedReadersHead = parent._blockedReadersHead;
                    blockedWritersHead = parent._blockedWritersHead;
                    waitingReadersHead = parent._waitingReadersHead;
                    waitingWritersHead = parent._waitingWritersHead;
                    parent._blockedReadersHead = null;
                    parent._blockedWritersHead = null;
                    parent._waitingReadersHead = null;
                    parent._waitingWritersHead = null;
                }
 
                // Complete the channel's task, as no more data can possibly arrive at this point.  We do this outside
                // of the lock in case we'll be running synchronous completions, and we
                // do it before completing blocked/waiting readers, so that when they
                // wake up they'll see the task as being completed.
                ChannelUtilities.Complete(parent._completion, error);
 
                // Complete all pending operations. We don't need to worry about concurrent mutation here:
                // No other writers or readers will be able to register operations, and any cancellation callbacks
                // will see the queues as being null and exit immediately.
                ChannelUtilities.FailOperations(blockedReadersHead, ChannelUtilities.CreateInvalidCompletionException(error));
                ChannelUtilities.FailOperations(blockedWritersHead, ChannelUtilities.CreateInvalidCompletionException(error));
                ChannelUtilities.SetOrFailOperations(waitingReadersHead, result: false, error: error);
                ChannelUtilities.SetOrFailOperations(waitingWritersHead, result: false, error: error);
 
                // Successfully transitioned to completed.
                return true;
            }
 
            public override bool TryWrite(T item)
            {
                RendezvousChannel<T> parent = _parent;
 
                // Reserve a blocked reader if one is available.
                BlockedReadAsyncOperation<T>? blockedReader = null;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    if (parent._doneWriting is null)
                    {
                        blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);
                    }
                }
 
                // If we got one, transfer its item to the read and complete successfully.
                if (blockedReader is not null)
                {
                    blockedReader.DangerousSetResult(item);
                    return true;
                }
 
                // There's no concurrent reader, but if we're configured to drop writes, we can succeed immediately.
                if (parent._dropWrites)
                {
                    parent._itemDropped?.Invoke(item);
                    return true;
                }
 
                return false;
            }
 
            public override ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
                }
 
                RendezvousChannel<T> parent = _parent;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we're done writing, a read will never be possible.
                    if (parent._doneWriting is not null)
                    {
                        return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
                            new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) :
                            default;
                    }
 
                    // If there are any readers waiting, a write is possible.
                    if (parent._blockedReadersHead is not null || parent._dropWrites)
                    {
                        return new ValueTask<bool>(true);
                    }
 
                    // There were no readers available, but there could be in the future, so ensure
                    // there's a waiting writer task and return it.
                    WaitingWriteAsyncOperation waiter =
                        !cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset() ? _waiterSingleton :
                        new(parent._runContinuationsAsynchronously, cancellationToken, cancellationCallback: _parent.CancellationCallbackDelegate);
                    ChannelUtilities.Enqueue(ref parent._waitingWritersHead, waiter);
                    return waiter.ValueTaskOfT;
                }
            }
 
            public override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
            {
                RendezvousChannel<T> parent = _parent;
 
                if (cancellationToken.IsCancellationRequested)
                {
                    return new ValueTask(Task.FromCanceled<T>(cancellationToken));
                }
 
                BlockedWriteAsyncOperation<T>? writer = null;
                WaitingReadAsyncOperation? waitingReaders = null;
                BlockedReadAsyncOperation<T>? blockedReader = null;
                lock (parent.SyncObj)
                {
                    parent.AssertInvariants();
 
                    // If we've already been marked as done for writing, we shouldn't be writing.
                    if (parent._doneWriting is not null)
                    {
                        return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting)));
                    }
 
                    // Reserve a blocked reader if one is available.
                    blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);
 
                    // If we couldn't get one, create a blocked writer, and reserve any waiting readers to alert.
                    if (blockedReader is null && !parent._dropWrites)
                    {
                        writer =
                            !cancellationToken.CanBeCanceled && _writerSingleton.TryOwnAndReset() ? _writerSingleton :
                            new(parent._runContinuationsAsynchronously, cancellationToken, cancellationCallback: _parent.CancellationCallbackDelegate);
                        writer.Item = item;
                        ChannelUtilities.Enqueue(ref parent._blockedWritersHead, writer);
 
                        waitingReaders = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingReadersHead);
                    }
                }
 
                if (writer is not null)
                {
                    Debug.Assert(blockedReader is null);
                    ChannelUtilities.DangerousSetOperations(waitingReaders, result: true);
                    return writer.ValueTask;
                }
 
                if (blockedReader is not null)
                {
                    blockedReader.DangerousSetResult(item);
                }
                else
                {
                    Debug.Assert(parent._dropWrites);
                    parent._itemDropped?.Invoke(item);
                }
 
                return default;
            }
 
            internal string DebuggerDisplay
            {
                get
                {
                    long blockedWriterCount, waitingWriterCount;
                    lock (_parent.SyncObj)
                    {
                        blockedWriterCount = ChannelUtilities.CountOperations(_parent._blockedWritersHead);
                        waitingWriterCount = ChannelUtilities.CountOperations(_parent._waitingWritersHead);
                    }
 
                    return $"WriteAsync={blockedWriterCount}, WaitToWriteAsync={waitingWriterCount}";
                }
            }
        }
 
        /// <summary>Gets an object used to synchronize all state on the instance.</summary>
        private object SyncObj => _completion;
 
        private Action<object?, CancellationToken> CancellationCallbackDelegate =>
            field ??= (state, cancellationToken) =>
            {
                AsyncOperation op = (AsyncOperation)state!;
                if (op.TrySetCanceled(cancellationToken))
                {
                    ChannelUtilities.UnsafeQueueUserWorkItem(static state => // escape cancellation callback
                    {
                        lock (state.Key.SyncObj)
                        {
                            switch (state.Value)
                            {
                                case BlockedReadAsyncOperation<T> blockedReader:
                                    ChannelUtilities.Remove(ref state.Key._blockedReadersHead, blockedReader);
                                    break;
 
                                case BlockedWriteAsyncOperation<T> blockedWriter:
                                    ChannelUtilities.Remove(ref state.Key._blockedWritersHead, blockedWriter);
                                    break;
 
                                case WaitingReadAsyncOperation waitingReader:
                                    ChannelUtilities.Remove(ref state.Key._waitingReadersHead, waitingReader);
                                    break;
 
                                case WaitingWriteAsyncOperation waitingWriter:
                                    ChannelUtilities.Remove(ref state.Key._waitingWritersHead, waitingWriter);
                                    break;
 
                                default:
                                    Debug.Fail($"Unexpected operation: {state.Value}");
                                    break;
                            }
                        }
                    }, new KeyValuePair<RendezvousChannel<T>, AsyncOperation>(this, op));
                }
            };
 
        private string DebuggerDisplay =>
            $"{((RendezvousChannelReader)Reader).DebuggerDisplay}, {((RendezvousChannelWriter)Writer).DebuggerDisplay}";
 
        [Conditional("DEBUG")]
        private void AssertInvariants()
        {
            Debug.Assert(SyncObj is not null, "The sync obj must not be null.");
            Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
 
            if (_blockedReadersHead is not null)
            {
                Debug.Assert(_blockedWritersHead is null, "There shouldn't be any blocked writer if there's a blocked reader.");
                Debug.Assert(_waitingWritersHead is null, "There shouldn't be any waiting writers if there's a blocked reader.");
            }
 
            if (_blockedWritersHead is not null)
            {
                Debug.Assert(_blockedReadersHead is null, "There shouldn't be any blocked readers if there's a blocked writer.");
                Debug.Assert(_waitingReadersHead is null, "There shouldn't be any waiting readers if there's a blocked writer.");
            }
 
            if (_completion.Task.IsCompleted)
            {
                Debug.Assert(_doneWriting is not null, "We can only complete if we're done writing.");
            }
        }
    }
}