File: System\Net\Http\SocketsHttpHandler\ContentLengthReadStream.cs
Web Access
Project: src\src\libraries\System.Net.Http\src\System.Net.Http.csproj (System.Net.Http)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Net.Http
{
    internal sealed partial class HttpConnection : IDisposable
    {
        private sealed class ContentLengthReadStream : HttpContentReadStream
        {
            private ulong _contentBytesRemaining;
 
            public ContentLengthReadStream(HttpConnection connection, ulong contentLength) : base(connection)
            {
                Debug.Assert(contentLength > 0, "Caller should have checked for 0.");
                _contentBytesRemaining = contentLength;
            }
 
            public override int Read(Span<byte> buffer)
            {
                if (_connection == null)
                {
                    // Response body fully consumed
                    return 0;
                }
 
                Debug.Assert(_contentBytesRemaining > 0);
                if ((ulong)buffer.Length > _contentBytesRemaining)
                {
                    buffer = buffer.Slice(0, (int)_contentBytesRemaining);
                }
 
                int bytesRead = _connection.Read(buffer);
                if (bytesRead <= 0 && buffer.Length != 0)
                {
                    // Unexpected end of response stream.
                    throw new HttpIOException(HttpRequestError.ResponseEnded, SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _contentBytesRemaining));
                }
 
                Debug.Assert((ulong)bytesRead <= _contentBytesRemaining);
                _contentBytesRemaining -= (ulong)bytesRead;
 
                if (_contentBytesRemaining == 0)
                {
                    // End of response body
                    _connection.CompleteResponse();
                    _connection = null;
                }
 
                return bytesRead;
            }
 
            public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
            {
                CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
 
                if (_connection == null)
                {
                    // Response body fully consumed
                    return 0;
                }
 
                Debug.Assert(_contentBytesRemaining > 0);
 
                if ((ulong)buffer.Length > _contentBytesRemaining)
                {
                    buffer = buffer.Slice(0, (int)_contentBytesRemaining);
                }
 
                ValueTask<int> readTask = _connection.ReadAsync(buffer);
                int bytesRead;
                if (readTask.IsCompletedSuccessfully)
                {
                    bytesRead = readTask.Result;
                }
                else
                {
                    CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
                    try
                    {
                        bytesRead = await readTask.ConfigureAwait(false);
                    }
                    catch (Exception exc) when (CancellationHelper.ShouldWrapInOperationCanceledException(exc, cancellationToken))
                    {
                        throw CancellationHelper.CreateOperationCanceledException(exc, cancellationToken);
                    }
                    finally
                    {
                        ctr.Dispose();
                    }
                }
 
                if (bytesRead == 0 && buffer.Length != 0)
                {
                    // A cancellation request may have caused the EOF.
                    CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
 
                    // Unexpected end of response stream.
                    throw new HttpIOException(HttpRequestError.ResponseEnded, SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _contentBytesRemaining));
                }
 
                Debug.Assert((ulong)bytesRead <= _contentBytesRemaining);
                _contentBytesRemaining -= (ulong)bytesRead;
 
                if (_contentBytesRemaining == 0)
                {
                    // End of response body
                    _connection.CompleteResponse();
                    _connection = null;
                }
 
                return bytesRead;
            }
 
            public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
            {
                ValidateCopyToArguments(destination, bufferSize);
 
                if (cancellationToken.IsCancellationRequested)
                {
                    return Task.FromCanceled(cancellationToken);
                }
 
                if (_connection == null)
                {
                    // null if response body fully consumed
                    return Task.CompletedTask;
                }
 
                Task copyTask = _connection.CopyToContentLengthAsync(destination, async: true, _contentBytesRemaining, bufferSize, cancellationToken);
                if (copyTask.IsCompletedSuccessfully)
                {
                    Finish();
                    return Task.CompletedTask;
                }
 
                return CompleteCopyToAsync(copyTask, cancellationToken);
            }
 
            private async Task CompleteCopyToAsync(Task copyTask, CancellationToken cancellationToken)
            {
                Debug.Assert(_connection != null);
                CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
                try
                {
                    await copyTask.ConfigureAwait(false);
                }
                catch (Exception exc) when (CancellationHelper.ShouldWrapInOperationCanceledException(exc, cancellationToken))
                {
                    throw CancellationHelper.CreateOperationCanceledException(exc, cancellationToken);
                }
                finally
                {
                    ctr.Dispose();
                }
 
                Finish();
            }
 
            private void Finish()
            {
                _contentBytesRemaining = 0;
                _connection!.CompleteResponse();
                _connection = null;
            }
 
            // Based on ReadChunkFromConnectionBuffer; perhaps we should refactor into a common routine.
            private ReadOnlyMemory<byte> ReadFromConnectionBuffer(int maxBytesToRead)
            {
                Debug.Assert(maxBytesToRead > 0);
                Debug.Assert(_contentBytesRemaining > 0);
                Debug.Assert(_connection != null);
 
                ReadOnlyMemory<byte> connectionBuffer = _connection.RemainingBuffer;
                if (connectionBuffer.Length == 0)
                {
                    return default;
                }
 
                int bytesToConsume = Math.Min(maxBytesToRead, (int)Math.Min((ulong)connectionBuffer.Length, _contentBytesRemaining));
                Debug.Assert(bytesToConsume > 0);
 
                _connection.ConsumeFromRemainingBuffer(bytesToConsume);
                _contentBytesRemaining -= (ulong)bytesToConsume;
 
                return connectionBuffer.Slice(0, bytesToConsume);
            }
 
            public override bool NeedsDrain => CanReadFromConnection;
 
            public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
            {
                Debug.Assert(_connection != null);
                Debug.Assert(_contentBytesRemaining > 0);
 
                ReadFromConnectionBuffer(int.MaxValue);
                if (_contentBytesRemaining == 0)
                {
                    Finish();
                    return true;
                }
 
                if (_contentBytesRemaining > (ulong)maxDrainBytes)
                {
                    return false;
                }
 
                CancellationTokenSource? cts = null;
                CancellationTokenRegistration ctr = default;
                TimeSpan drainTime = _connection._pool.Settings._maxResponseDrainTime;
 
                if (drainTime == TimeSpan.Zero)
                {
                    return false;
                }
 
                if (drainTime != Timeout.InfiniteTimeSpan)
                {
                    cts = new CancellationTokenSource((int)drainTime.TotalMilliseconds);
                    ctr = cts.Token.Register(static s => ((HttpConnection)s!).Dispose(), _connection);
                }
 
                try
                {
                    while (true)
                    {
                        await _connection.FillAsync(async: true).ConfigureAwait(false);
                        ReadFromConnectionBuffer(int.MaxValue);
                        if (_contentBytesRemaining == 0)
                        {
                            // Dispose of the registration and then check whether cancellation has been
                            // requested. This is necessary to make deterministic a race condition between
                            // cancellation being requested and unregistering from the token.  Otherwise,
                            // it's possible cancellation could be requested just before we unregister and
                            // we then return a connection to the pool that has been or will be disposed
                            // (e.g. if a timer is used and has already queued its callback but the
                            // callback hasn't yet run).
                            ctr.Dispose();
                            CancellationHelper.ThrowIfCancellationRequested(ctr.Token);
 
                            Finish();
                            return true;
                        }
                    }
                }
                finally
                {
                    ctr.Dispose();
                    cts?.Dispose();
                }
            }
        }
    }
}