File: System\IO\Pipelines\PipeReader.cs
Web Access
Project: src\src\libraries\System.IO.Pipelines\src\System.IO.Pipelines.csproj (System.IO.Pipelines)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.IO.Pipelines
{
    /// <summary>Defines a class that provides access to a read side of pipe.</summary>
    public abstract partial class PipeReader
    {
        private PipeReaderStream? _stream;
 
        /// <summary>Attempts to synchronously read data from the <see cref="System.IO.Pipelines.PipeReader" />.</summary>
        /// <param name="result">When this method returns <see langword="true" />, this value is set to a <see cref="System.IO.Pipelines.ReadResult" /> instance that represents the result of the read call; otherwise, this value is set to <see langword="default" />.</param>
        /// <returns><see langword="true" /> if data was available, or if the call was canceled or the writer was completed; otherwise, <see langword="false" />.</returns>
        /// <remarks><format type="text/markdown"><![CDATA[
        /// If the pipe returns <see langword="false" />, there is no need to call <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" />.
        /// [!IMPORTANT]
        /// The `System.IO.Pipelines.PipeReader` implementation returned by `System.IO.Pipelines.PipeReader.Create(System.IO.Stream, System.IO.Pipelines.StreamPipeReaderOptions?)`
        /// will not read new data from the backing `System.IO.Stream` when `System.IO.Pipelines.PipeReader.TryRead(out System.IO.Pipelines.ReadResult)` is called.
        ///
        /// `System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)` must be called to read new data from the backing `System.IO.Stream`.
        /// Any unconsumed data from a previous asynchronous read will be available to `System.IO.Pipelines.PipeReader.TryRead(out System.IO.Pipelines.ReadResult)`.
        /// ]]></format></remarks>
        public abstract bool TryRead(out ReadResult result);
 
        /// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
        /// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
        public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);
 
        /// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
        /// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
        /// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
        /// <remarks>
        ///     <para>
        ///     The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.
        ///     </para>
        ///     <para>
        ///     Passing a value of 0 for <paramref name="minimumSize" /> will return a <see cref="System.Threading.Tasks.ValueTask{T}" /> that will not complete until
        ///     further data is available. You should instead call <see cref="System.IO.Pipelines.PipeReader.TryRead" /> to avoid a blocking call.
        ///     </para>
        ///     <para>
        ///     Subsequent calls to <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> should
        ///     examine at least <paramref name="minimumSize" /> bytes in order to avoid an <see cref="System.InvalidOperationException" />.
        ///     </para>
        /// </remarks>
        public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default)
        {
            if (minimumSize < 0)
            {
                ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
            }
 
            return ReadAtLeastAsyncCore(minimumSize, cancellationToken);
        }
 
        /// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
        /// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
        /// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
        /// <remarks>The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.</remarks>
        protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
        {
            while (true)
            {
                ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
                ReadOnlySequence<byte> buffer = result.Buffer;
 
                if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)
                {
                    return result;
                }
 
                // Keep buffering until we get more data
                AdvanceTo(buffer.Start, buffer.End);
            }
        }
 
        /// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed.</summary>
        /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
        /// <remarks>The memory for the consumed data will be released and no longer available.
        /// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
        /// This is equivalent to calling <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> with identical examined and consumed positions.
        /// The examined data communicates to the pipeline when it should signal more data is available.
        /// </remarks>
        public abstract void AdvanceTo(SequencePosition consumed);
 
        /// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed, read and examined.</summary>
        /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
        /// <param name="examined">Marks the extent of the data that has been read and examined.</param>
        /// <remarks>The memory for the consumed data will be released and no longer available.
        /// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
        /// The examined data communicates to the pipeline when it should signal more data is available.</remarks>
        public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
 
        /// <summary>Returns a <see cref="System.IO.Stream" /> representation of the <see cref="System.IO.Pipelines.PipeReader" />.</summary>
        /// <param name="leaveOpen">An optional flag that indicates whether disposing the returned <see cref="System.IO.Stream" /> leaves <see cref="System.IO.Pipelines.PipeReader" /> open (<see langword="true" />) or completes <see cref="System.IO.Pipelines.PipeReader" /> (<see langword="false" />).</param>
        /// <returns>A stream that represents the <see cref="System.IO.Pipelines.PipeReader" />.</returns>
        public virtual Stream AsStream(bool leaveOpen = false)
        {
            if (_stream == null)
            {
                _stream = new PipeReaderStream(this, leaveOpen);
            }
            else if (leaveOpen)
            {
                _stream.LeaveOpen = leaveOpen;
            }
 
            return _stream;
        }
 
        /// <summary>Cancels the pending <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> operation without causing it to throw and without completing the <see cref="System.IO.Pipelines.PipeReader" />. If there is no pending operation, this cancels the next operation.</summary>
        /// <remarks>The canceled <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> operation returns a <see cref="System.IO.Pipelines.ReadResult" /> where <see cref="System.IO.Pipelines.ReadResult.IsCanceled" /> is <see langword="true" />.</remarks>
        public abstract void CancelPendingRead();
 
        /// <summary>Signals to the producer that the consumer is done reading.</summary>
        /// <param name="exception">Optional <see cref="System.Exception" /> indicating a failure that's causing the pipeline to complete.</param>
        public abstract void Complete(Exception? exception = null);
 
        /// <summary>Marks the current pipe reader instance as being complete, meaning no more data will be read from it.</summary>
        /// <param name="exception">An optional exception that indicates the failure that caused the reader to complete.</param>
        /// <returns>A value task that represents the asynchronous complete operation.</returns>
        public virtual ValueTask CompleteAsync(Exception? exception = null)
        {
            try
            {
                Complete(exception);
                return default;
            }
            catch (Exception ex)
            {
                return new ValueTask(Task.FromException(ex));
            }
        }
 
        /// <summary>Registers a callback that executes when the <see cref="System.IO.Pipelines.PipeWriter" /> side of the pipe is completed.</summary>
        /// <param name="callback">The callback to register.</param>
        /// <param name="state">The state object to pass to <paramref name="callback" /> when it's invoked.</param>
        /// <remarks><format type="text/markdown"><![CDATA[
        /// > [!IMPORTANT]
        /// > `OnWriterCompleted` may not be invoked on all implementations of <xref:System.IO.Pipelines.PipeWriter>. This method will be removed in a future release.
        /// ]]></format></remarks>
        [Obsolete("OnWriterCompleted has been deprecated and may not be invoked on all implementations of PipeReader.")]
        public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
        {
 
        }
 
        /// <summary>Creates a <see cref="System.IO.Pipelines.PipeReader" /> wrapping the specified <see cref="System.IO.Stream" />.</summary>
        /// <param name="stream">The stream that the pipe reader will wrap.</param>
        /// <param name="readerOptions">The options to configure the pipe reader.</param>
        /// <returns>A <see cref="System.IO.Pipelines.PipeReader" /> that wraps the <see cref="System.IO.Stream" />.</returns>
        public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null)
        {
            return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
        }
 
        /// <summary>
        /// Creates a <see cref="PipeReader"/> wrapping the specified <see cref="ReadOnlySequence{T}"/>.
        /// </summary>
        /// <param name="sequence">The sequence.</param>
        /// <returns>A <see cref="PipeReader"/> that wraps the <see cref="ReadOnlySequence{T}"/>.</returns>
        public static PipeReader Create(ReadOnlySequence<byte> sequence)
        {
            return new SequencePipeReader(sequence);
        }
 
        /// <summary>Asynchronously reads the bytes from the <see cref="System.IO.Pipelines.PipeReader" /> and writes them to the specified <see cref="System.IO.Pipelines.PipeWriter" />, using a specified buffer size and cancellation token.</summary>
        /// <param name="destination">The pipe writer to which the contents of the current stream will be copied.</param>
        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="System.Threading.CancellationToken.None" />.</param>
        /// <returns>A task that represents the asynchronous copy operation.</returns>
        public virtual Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
        {
            if (destination is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination);
            }
 
            if (cancellationToken.IsCancellationRequested)
            {
                return Task.FromCanceled(cancellationToken);
            }
 
            return CopyToAsyncCore(
                destination,
                (destination, memory, cancellationToken) => destination.WriteAsync(memory, cancellationToken),
                cancellationToken);
        }
 
        /// <summary>Asynchronously reads the bytes from the <see cref="System.IO.Pipelines.PipeReader" /> and writes them to the specified stream, using a specified cancellation token.</summary>
        /// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="System.Threading.CancellationToken.None" />.</param>
        /// <returns>A task that represents the asynchronous copy operation.</returns>
        public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
        {
            if (destination is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination);
            }
 
            if (cancellationToken.IsCancellationRequested)
            {
                return Task.FromCanceled(cancellationToken);
            }
 
            return CopyToAsyncCore(destination, (destination, memory, cancellationToken) =>
            {
                ValueTask task = destination.WriteAsync(memory, cancellationToken);
 
                if (task.IsCompletedSuccessfully)
                {
                    task.GetAwaiter().GetResult();
                    return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false));
                }
 
                static async ValueTask<FlushResult> Awaited(ValueTask writeTask)
                {
                    await writeTask.ConfigureAwait(false);
                    return new FlushResult(isCanceled: false, isCompleted: false);
                }
 
                return Awaited(task);
            },
            cancellationToken);
        }
 
        private async Task CopyToAsyncCore<TStream>(TStream destination, Func<TStream, ReadOnlyMemory<byte>, CancellationToken, ValueTask<FlushResult>> writeAsync, CancellationToken cancellationToken)
        {
            while (true)
            {
                ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
                ReadOnlySequence<byte> buffer = result.Buffer;
                SequencePosition position = buffer.Start;
                SequencePosition consumed = position;
 
                try
                {
                    if (result.IsCanceled)
                    {
                        ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
                    }
 
                    while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
                    {
                        if (memory.IsEmpty)
                        {
                            // advance tracking only (to account for any boundary scenarios)
                            consumed = position;
                        }
                        else
                        {
                            // write and advance
                            FlushResult flushResult = await writeAsync(destination, memory, cancellationToken).ConfigureAwait(false);
 
                            if (flushResult.IsCanceled)
                            {
                                ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
                            }
 
                            consumed = position;
 
                            if (flushResult.IsCompleted)
                            {
                                return;
                            }
                        }
                    }
 
                    // The while loop completed successfully, so we've consumed the entire buffer.
                    consumed = buffer.End;
 
                    if (result.IsCompleted)
                    {
                        break;
                    }
                }
                finally
                {
                    // Advance even if WriteAsync throws so the PipeReader is not left in the
                    // currently reading state
                    AdvanceTo(consumed);
                }
            }
        }
    }
}