File: ResponseBodyReaderStream.cs
Web Access
Project: src\src\Hosting\TestHost\src\Microsoft.AspNetCore.TestHost.csproj (Microsoft.AspNetCore.TestHost)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
 
namespace Microsoft.AspNetCore.TestHost;
 
/// <summary>
/// The client's view of the response body.
/// </summary>
internal sealed class ResponseBodyReaderStream : Stream
{
    private bool _readerComplete;
    private bool _aborted;
    private Exception? _abortException;
 
    private readonly object _abortLock = new object();
    private readonly Action _abortRequest;
    private readonly Action _readComplete;
    private readonly Pipe _pipe;
 
    internal ResponseBodyReaderStream(Pipe pipe, Action abortRequest, Action readComplete)
    {
        _pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
        _abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
        _readComplete = readComplete;
    }
 
    public override bool CanRead => true;
 
    public override bool CanSeek => false;
 
    public override bool CanWrite => false;
 
    #region NotSupported
 
    public override long Length => throw new NotSupportedException();
 
    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }
 
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
 
    public override void SetLength(long value) => throw new NotSupportedException();
 
    public override void Flush() => throw new NotSupportedException();
 
    public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
 
    // Write with count 0 will still trigger OnFirstWrite
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
 
    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw new NotSupportedException();
 
    public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken) => throw new NotSupportedException();
 
    #endregion NotSupported
 
    public override int Read(byte[] buffer, int offset, int count)
    {
        return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
    }
 
    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
    }
 
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        CheckAborted();
 
        if (_readerComplete)
        {
            return 0;
        }
 
        using var registration = cancellationToken.Register(Cancel);
        var result = await _pipe.Reader.ReadAsync(cancellationToken);
 
        if (result.IsCanceled)
        {
            throw new OperationCanceledException();
        }
 
        if (result.Buffer.IsEmpty && result.IsCompleted)
        {
            _readComplete();
            _readerComplete = true;
            return 0;
        }
 
        var readableBuffer = result.Buffer;
        var actual = Math.Min(readableBuffer.Length, buffer.Length);
        readableBuffer = readableBuffer.Slice(0, actual);
        readableBuffer.CopyTo(buffer.Span);
        _pipe.Reader.AdvanceTo(readableBuffer.End);
        return (int)actual;
    }
 
    internal void Cancel()
    {
        Abort(new OperationCanceledException());
    }
 
    internal void Abort(Exception innerException)
    {
        Debug.Assert(innerException != null);
 
        lock (_abortLock)
        {
            _abortException = innerException;
            _aborted = true;
        }
 
        _pipe.Reader.CancelPendingRead();
    }
 
    private void CheckAborted()
    {
        lock (_abortLock)
        {
            if (_aborted)
            {
                throw new IOException(string.Empty, _abortException);
            }
        }
    }
 
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            _abortRequest();
        }
 
        _pipe.Reader.Complete();
 
        base.Dispose(disposing);
    }
}