File: System\Threading\Channels\ChannelReader.cs
Web Access
Project: src\src\libraries\System.Threading.Channels\src\System.Threading.Channels.csproj (System.Threading.Channels)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
 
namespace System.Threading.Channels
{
    /// <summary>
    /// Provides a base class for reading from a channel.
    /// </summary>
    /// <typeparam name="T">Specifies the type of data that may be read from the channel.</typeparam>
    public abstract partial class ChannelReader<T>
    {
        /// <summary>
        /// Gets a <see cref="Task"/> that completes when no more data will ever
        /// be available to be read from this channel.
        /// </summary>
        public virtual Task Completion => ChannelUtilities.s_neverCompletingTask;
 
        /// <summary>Gets whether <see cref="Count"/> is available for use on this <see cref="ChannelReader{T}"/> instance.</summary>
        public virtual bool CanCount => false;
 
        /// <summary>Gets whether <see cref="TryPeek"/> is available for use on this <see cref="ChannelReader{T}"/> instance.</summary>
        public virtual bool CanPeek => false;
 
        /// <summary>Gets the current number of items available from this channel reader.</summary>
        /// <exception cref="NotSupportedException">Counting is not supported on this instance.</exception>
        public virtual int Count => throw new NotSupportedException();
 
        /// <summary>Attempts to read an item from the channel.</summary>
        /// <param name="item">The read item, or a default value if no item could be read.</param>
        /// <returns>true if an item was read; otherwise, false if no item was read.</returns>
        public abstract bool TryRead([MaybeNullWhen(false)] out T item);
 
        /// <summary>Attempts to peek at an item from the channel.</summary>
        /// <param name="item">The peeked item, or a default value if no item could be peeked.</param>
        /// <returns>true if an item was read; otherwise, false if no item was read.</returns>
        public virtual bool TryPeek([MaybeNullWhen(false)] out T item)
        {
            item = default;
            return false;
        }
 
        /// <summary>Returns a <see cref="ValueTask{Boolean}"/> that will complete when data is available to read.</summary>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the wait operation.</param>
        /// <returns>
        /// A <see cref="ValueTask{Boolean}"/> that will complete with a <c>true</c> result when data is available to read
        /// or with a <c>false</c> result when no further data will ever be available to be read.
        /// </returns>
        public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
 
        /// <summary>Asynchronously reads an item from the channel.</summary>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the read operation.</param>
        /// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous read operation.</returns>
        public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
            }
 
            try
            {
                if (TryRead(out T? fastItem))
                {
                    return new ValueTask<T>(fastItem);
                }
            }
            catch (Exception exc) when (!(exc is ChannelClosedException || exc is OperationCanceledException))
            {
                return new ValueTask<T>(Task.FromException<T>(exc));
            }
 
            return ReadAsyncCore(cancellationToken);
 
            async ValueTask<T> ReadAsyncCore(CancellationToken ct)
            {
                while (true)
                {
                    if (!await WaitToReadAsync(ct).ConfigureAwait(false))
                    {
                        throw new ChannelClosedException();
                    }
 
                    if (TryRead(out T? item))
                    {
                        return item;
                    }
                }
            }
        }
 
        /// <summary>Creates an <see cref="IAsyncEnumerable{T}"/> that enables reading all of the data from the channel.</summary>
        /// <param name="cancellationToken">The <see cref="CancellationToken"/> to use to cancel the enumeration.</param>
        /// <remarks>
        /// Each <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> call that returns <c>true</c> will read the next item out of the channel.
        /// <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> will return false once no more data is or will ever be available to read.
        /// </remarks>
        /// <returns>The created async enumerable.</returns>
        public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
        {
            while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
            {
                while (TryRead(out T? item))
                {
                    yield return item;
                }
            }
        }
    }
}