File: System\IO\Pipelines\PipeReaderStream.cs
Web Access
Project: src\src\libraries\System.IO.Pipelines\src\System.IO.Pipelines.csproj (System.IO.Pipelines)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.IO.Pipelines
{
    internal sealed class PipeReaderStream : Stream
    {
        private readonly PipeReader _pipeReader;
 
        public PipeReaderStream(PipeReader pipeReader, bool leaveOpen)
        {
            Debug.Assert(pipeReader != null);
            _pipeReader = pipeReader;
            LeaveOpen = leaveOpen;
        }
 
        protected override void Dispose(bool disposing)
        {
            if (!LeaveOpen)
            {
                _pipeReader.Complete();
            }
            base.Dispose(disposing);
        }
 
        public override bool CanRead => true;
 
        public override bool CanSeek => false;
 
        public override bool CanWrite => false;
 
        public override long Length => throw new NotSupportedException();
 
        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
 
        internal bool LeaveOpen { get; set; }
 
        public override void Flush()
        {
        }
 
        public override int Read(byte[] buffer, int offset, int count)
        {
            if (buffer is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
            }
 
            return ReadInternal(new Span<byte>(buffer, offset, count));
        }
 
        public override int ReadByte()
        {
            Span<byte> oneByte = stackalloc byte[1];
            return ReadInternal(oneByte) == 0 ? -1 : oneByte[0];
        }
 
        private int ReadInternal(Span<byte> buffer)
        {
            ValueTask<ReadResult> vt = _pipeReader.ReadAsync();
            ReadResult result = vt.IsCompletedSuccessfully ? vt.Result : vt.AsTask().GetAwaiter().GetResult();
            return HandleReadResult(result, buffer);
        }
 
        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
 
        public override void SetLength(long value) => throw new NotSupportedException();
 
        public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
 
        public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
            TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, default), callback, state);
 
        public sealed override int EndRead(IAsyncResult asyncResult) =>
            TaskToAsyncResult.End<int>(asyncResult);
 
        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            if (buffer is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
            }
 
            return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
        }
 
#if (!NETSTANDARD2_0 && !NETFRAMEWORK)
        public override int Read(Span<byte> buffer)
        {
            return ReadInternal(buffer);
        }
 
        public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
        {
            return ReadAsyncInternal(buffer, cancellationToken);
        }
#endif
 
        private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
        {
            ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
            return HandleReadResult(result, buffer.Span);
        }
 
        private int HandleReadResult(ReadResult result, Span<byte> buffer)
        {
            if (result.IsCanceled)
            {
                ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
            }
 
            ReadOnlySequence<byte> sequence = result.Buffer;
            long bufferLength = sequence.Length;
            SequencePosition consumed = sequence.Start;
 
            try
            {
                if (bufferLength != 0)
                {
                    int actual = (int)Math.Min(bufferLength, buffer.Length);
 
                    ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
                    consumed = slice.End;
                    slice.CopyTo(buffer);
 
                    return actual;
                }
 
                if (result.IsCompleted)
                {
                    return 0;
                }
            }
            finally
            {
                _pipeReader.AdvanceTo(consumed);
            }
 
            // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
            // isn't completed or canceled
            ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
            return 0;
        }
 
        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
        {
            StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
 
            // Delegate to CopyToAsync on the PipeReader
            return _pipeReader.CopyToAsync(destination, cancellationToken);
        }
    }
}