File: System\Net\Http\SocketsHttpHandler\ChunkedEncodingReadStream.cs
Web Access
Project: src\src\libraries\System.Net.Http\src\System.Net.Http.csproj (System.Net.Http)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers.Text;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Net.Http
{
    internal sealed partial class HttpConnection
    {
        private sealed class ChunkedEncodingReadStream : HttpContentReadStream
        {
            /// <summary>The number of bytes remaining in the chunk.</summary>
            private ulong _chunkBytesRemaining;
            /// <summary>The current state of the parsing state machine for the chunked response.</summary>
            private ParsingState _state = ParsingState.ExpectChunkHeader;
            private readonly HttpResponseMessage _response;
 
            public ChunkedEncodingReadStream(HttpConnection connection, HttpResponseMessage response) : base(connection)
            {
                Debug.Assert(response != null, "The HttpResponseMessage cannot be null.");
                _response = response;
            }
 
            public override int Read(Span<byte> buffer)
            {
                if (_connection == null)
                {
                    // Response body fully consumed
                    return 0;
                }
 
                if (buffer.Length == 0)
                {
                    if (PeekChunkFromConnectionBuffer())
                    {
                        return 0;
                    }
                }
                else
                {
                    // Try to consume from data we already have in the buffer.
                    int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
                    if (bytesRead > 0)
                    {
                        return bytesRead;
                    }
                }
 
                // Nothing available to consume.  Fall back to I/O.
                while (true)
                {
                    if (_connection == null)
                    {
                        // Fully consumed the response in ReadChunksFromConnectionBuffer.
                        return 0;
                    }
 
                    if (_state == ParsingState.ExpectChunkData &&
                        buffer.Length >= _connection.ReadBufferSize &&
                        _chunkBytesRemaining >= (ulong)_connection.ReadBufferSize)
                    {
                        // As an optimization, we skip going through the connection's read buffer if both
                        // the remaining chunk data and the buffer are both at least as large
                        // as the connection buffer.  That avoids an unnecessary copy while still reading
                        // the maximum amount we'd otherwise read at a time.
                        Debug.Assert(_connection.RemainingBuffer.Length == 0);
                        Debug.Assert(buffer.Length != 0);
                        int bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
                        if (bytesRead == 0)
                        {
                            throw new HttpIOException(HttpRequestError.ResponseEnded, SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _chunkBytesRemaining));
                        }
                        _chunkBytesRemaining -= (ulong)bytesRead;
                        if (_chunkBytesRemaining == 0)
                        {
                            _state = ParsingState.ExpectChunkTerminator;
                        }
                        return bytesRead;
                    }
 
                    if (buffer.Length == 0)
                    {
                        // User requested a zero-byte read, and we have no data available in the buffer for processing.
                        // This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
                        // for reduced memory consumption when data is not immediately available.
                        // So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
                        // optimizations, such as deferring buffer allocation until data is actually available.
                        _connection.Read(buffer);
                    }
 
                    // We're only here if we need more data to make forward progress.
                    Fill();
 
                    // Now that we have more, see if we can get any response data, and if
                    // we can we're done.
                    if (buffer.Length == 0)
                    {
                        if (PeekChunkFromConnectionBuffer())
                        {
                            return 0;
                        }
                    }
                    else
                    {
                        int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
                        if (bytesCopied > 0)
                        {
                            return bytesCopied;
                        }
                    }
                }
            }
 
            public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    // Cancellation requested.
                    return ValueTask.FromCanceled<int>(cancellationToken);
                }
 
                if (_connection == null)
                {
                    // Response body fully consumed
                    return new ValueTask<int>(0);
                }
 
                if (buffer.Length == 0)
                {
                    if (PeekChunkFromConnectionBuffer())
                    {
                        return new ValueTask<int>(0);
                    }
                }
                else
                {
                    // Try to consume from data we already have in the buffer.
                    int bytesRead = ReadChunksFromConnectionBuffer(buffer.Span, cancellationRegistration: default);
                    if (bytesRead > 0)
                    {
                        return new ValueTask<int>(bytesRead);
                    }
                }
 
                // We may have just consumed the remainder of the response (with no actual data
                // available), so check again.
                if (_connection == null)
                {
                    Debug.Assert(_state == ParsingState.Done);
                    return new ValueTask<int>(0);
                }
 
                // Nothing available to consume.  Fall back to I/O.
                return ReadAsyncCore(buffer, cancellationToken);
            }
 
            private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
            {
                // Should only be called if ReadChunksFromConnectionBuffer returned 0.
 
                Debug.Assert(_connection != null);
 
                CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
                try
                {
                    while (true)
                    {
                        if (_connection == null)
                        {
                            // Fully consumed the response in ReadChunksFromConnectionBuffer.
                            return 0;
                        }
 
                        if (_state == ParsingState.ExpectChunkData &&
                            buffer.Length >= _connection.ReadBufferSize &&
                            _chunkBytesRemaining >= (ulong)_connection.ReadBufferSize)
                        {
                            // As an optimization, we skip going through the connection's read buffer if both
                            // the remaining chunk data and the buffer are both at least as large
                            // as the connection buffer.  That avoids an unnecessary copy while still reading
                            // the maximum amount we'd otherwise read at a time.
                            Debug.Assert(_connection.RemainingBuffer.Length == 0);
                            Debug.Assert(buffer.Length != 0);
                            int bytesRead = await _connection.ReadAsync(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining))).ConfigureAwait(false);
                            if (bytesRead == 0)
                            {
                                throw new HttpIOException(HttpRequestError.ResponseEnded, SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _chunkBytesRemaining));
                            }
                            _chunkBytesRemaining -= (ulong)bytesRead;
                            if (_chunkBytesRemaining == 0)
                            {
                                _state = ParsingState.ExpectChunkTerminator;
                            }
                            return bytesRead;
                        }
 
                        if (buffer.Length == 0)
                        {
                            // User requested a zero-byte read, and we have no data available in the buffer for processing.
                            // This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
                            // for reduced memory consumption when data is not immediately available.
                            // So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
                            // optimizations, such as deferring buffer allocation until data is actually available.
                            await _connection.ReadAsync(buffer).ConfigureAwait(false);
                        }
 
                        // We're only here if we need more data to make forward progress.
                        await FillAsync().ConfigureAwait(false);
 
                        // Now that we have more, see if we can get any response data, and if
                        // we can we're done.
                        if (buffer.Length == 0)
                        {
                            if (PeekChunkFromConnectionBuffer())
                            {
                                return 0;
                            }
                        }
                        else
                        {
                            int bytesCopied = ReadChunksFromConnectionBuffer(buffer.Span, ctr);
                            if (bytesCopied > 0)
                            {
                                return bytesCopied;
                            }
                        }
                    }
                }
                catch (Exception exc) when (CancellationHelper.ShouldWrapInOperationCanceledException(exc, cancellationToken))
                {
                    throw CancellationHelper.CreateOperationCanceledException(exc, cancellationToken);
                }
                finally
                {
                    ctr.Dispose();
                }
            }
 
            public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
            {
                ValidateCopyToArguments(destination, bufferSize);
 
                return
                    cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) :
                    _connection == null ? Task.CompletedTask :
                    CopyToAsyncCore(destination, cancellationToken);
            }
 
            private async Task CopyToAsyncCore(Stream destination, CancellationToken cancellationToken)
            {
                CancellationTokenRegistration ctr = _connection!.RegisterCancellation(cancellationToken);
                try
                {
                    while (true)
                    {
                        while (true)
                        {
                            if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
                            {
                                break;
                            }
                            await destination.WriteAsync(bytesRead, cancellationToken).ConfigureAwait(false);
                        }
 
                        if (_connection == null)
                        {
                            // Fully consumed the response.
                            return;
                        }
 
                        await FillAsync().ConfigureAwait(false);
                    }
                }
                catch (Exception exc) when (CancellationHelper.ShouldWrapInOperationCanceledException(exc, cancellationToken))
                {
                    throw CancellationHelper.CreateOperationCanceledException(exc, cancellationToken);
                }
                finally
                {
                    ctr.Dispose();
                }
            }
 
            private bool PeekChunkFromConnectionBuffer()
            {
                return ReadChunkFromConnectionBuffer(maxBytesToRead: 0, cancellationRegistration: default).HasValue;
            }
 
            private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenRegistration cancellationRegistration)
            {
                Debug.Assert(buffer.Length > 0);
                int totalBytesRead = 0;
                while (buffer.Length > 0)
                {
                    if (ReadChunkFromConnectionBuffer(buffer.Length, cancellationRegistration) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
                    {
                        break;
                    }
 
                    Debug.Assert(bytesRead.Length <= buffer.Length);
                    totalBytesRead += bytesRead.Length;
                    bytesRead.Span.CopyTo(buffer);
                    buffer = buffer.Slice(bytesRead.Length);
                }
                return totalBytesRead;
            }
 
            private ReadOnlyMemory<byte>? ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
            {
                Debug.Assert(_connection != null);
 
                try
                {
                    ReadOnlySpan<byte> currentLine;
                    switch (_state)
                    {
                        case ParsingState.ExpectChunkHeader:
                            Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");
 
                            // Read the chunk header line.
                            if (!_connection.TryReadNextChunkedLine(out currentLine))
                            {
                                // Could not get a whole line, so we can't parse the chunk header.
                                return default;
                            }
 
                            // Parse the hex value from it.
                            if (!Utf8Parser.TryParse(currentLine, out ulong chunkSize, out int bytesConsumed, 'X'))
                            {
                                throw new HttpIOException(HttpRequestError.InvalidResponse, SR.Format(SR.net_http_invalid_response_chunk_header_invalid, BitConverter.ToString(currentLine.ToArray())));
                            }
                            _chunkBytesRemaining = chunkSize;
 
                            // If there's a chunk extension after the chunk size, validate it.
                            if (bytesConsumed != currentLine.Length)
                            {
                                ValidateChunkExtension(currentLine.Slice(bytesConsumed));
                            }
 
                            // Proceed to handle the chunk.  If there's data in it, go read it.
                            // Otherwise, finish handling the response.
                            if (chunkSize > 0)
                            {
                                _state = ParsingState.ExpectChunkData;
                                goto case ParsingState.ExpectChunkData;
                            }
                            else
                            {
                                _state = ParsingState.ConsumeTrailers;
                                goto case ParsingState.ConsumeTrailers;
                            }
 
                        case ParsingState.ExpectChunkData:
                            Debug.Assert(_chunkBytesRemaining > 0);
 
                            ReadOnlyMemory<byte> connectionBuffer = _connection.RemainingBuffer;
                            if (connectionBuffer.Length == 0)
                            {
                                return default;
                            }
 
                            int bytesToConsume = Math.Min(maxBytesToRead, (int)Math.Min((ulong)connectionBuffer.Length, _chunkBytesRemaining));
                            Debug.Assert(bytesToConsume > 0 || maxBytesToRead == 0);
 
                            _connection.ConsumeFromRemainingBuffer(bytesToConsume);
                            _chunkBytesRemaining -= (ulong)bytesToConsume;
                            if (_chunkBytesRemaining == 0)
                            {
                                _state = ParsingState.ExpectChunkTerminator;
                            }
 
                            return connectionBuffer.Slice(0, bytesToConsume);
 
                        case ParsingState.ExpectChunkTerminator:
                            Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");
 
                            if (!_connection.TryReadNextChunkedLine(out currentLine))
                            {
                                return default;
                            }
 
                            if (currentLine.Length != 0)
                            {
                                throw new HttpIOException(HttpRequestError.InvalidResponse, SR.Format(SR.net_http_invalid_response_chunk_terminator_invalid, Encoding.ASCII.GetString(currentLine)));
                            }
 
                            _state = ParsingState.ExpectChunkHeader;
                            goto case ParsingState.ExpectChunkHeader;
 
                        case ParsingState.ConsumeTrailers:
                            Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");
 
                            // Consume the receive buffer. If the stream is disposed, pass a null response to avoid
                            // processing headers for a connection returned to the pool.
                            if (_connection.ParseHeaders(IsDisposed ? null : _response, isFromTrailer: true))
                            {
                                // Dispose of the registration and then check whether cancellation has been
                                // requested. This is necessary to make deterministic a race condition between
                                // cancellation being requested and unregistering from the token.  Otherwise,
                                // it's possible cancellation could be requested just before we unregister and
                                // we then return a connection to the pool that has been or will be disposed
                                // (e.g. if a timer is used and has already queued its callback but the
                                // callback hasn't yet run).
                                cancellationRegistration.Dispose();
                                CancellationHelper.ThrowIfCancellationRequested(cancellationRegistration.Token);
 
                                _state = ParsingState.Done;
                                _connection.CompleteResponse();
                                _connection = null;
                            }
 
                            return default;
 
                        default:
                        case ParsingState.Done: // shouldn't be called once we're done
                            Debug.Fail($"Unexpected state: {_state}");
                            if (NetEventSource.Log.IsEnabled())
                            {
                                NetEventSource.Error(this, $"Unexpected state: {_state}");
                            }
 
                            return default;
                    }
                }
                catch (Exception)
                {
                    // Ensure we don't try to read from the connection again (in particular, for draining)
                    _connection!.Dispose();
                    _connection = null;
                    throw;
                }
            }
 
            private static void ValidateChunkExtension(ReadOnlySpan<byte> lineAfterChunkSize)
            {
                // Until we see the ';' denoting the extension, the line after the chunk size
                // must contain only tabs and spaces.  After the ';', anything goes.
                for (int i = 0; i < lineAfterChunkSize.Length; i++)
                {
                    byte c = lineAfterChunkSize[i];
                    if (c == ';')
                    {
                        break;
                    }
                    else if (c != ' ' && c != '\t') // not called out in the RFC, but WinHTTP allows it
                    {
                        throw new HttpIOException(HttpRequestError.InvalidResponse, SR.Format(SR.net_http_invalid_response_chunk_extension_invalid, BitConverter.ToString(lineAfterChunkSize.ToArray())));
                    }
                }
            }
 
            private enum ParsingState : byte
            {
                ExpectChunkHeader,
                ExpectChunkData,
                ExpectChunkTerminator,
                ConsumeTrailers,
                Done
            }
 
            public override bool NeedsDrain => CanReadFromConnection;
 
            public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
            {
                Debug.Assert(_connection != null);
 
                CancellationTokenSource? cts = null;
                CancellationTokenRegistration ctr = default;
                try
                {
                    int drainedBytes = 0;
                    while (true)
                    {
                        drainedBytes += _connection.RemainingBuffer.Length;
                        while (true)
                        {
                            if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
                            {
                                break;
                            }
                        }
 
                        // When ReadChunkFromConnectionBuffer reads the final chunk, it will clear out _connection
                        // and return the connection to the pool.
                        if (_connection == null)
                        {
                            return true;
                        }
 
                        if (drainedBytes >= maxDrainBytes)
                        {
                            return false;
                        }
 
                        if (cts == null) // only create the drain timer if we have to go async
                        {
                            TimeSpan drainTime = _connection._pool.Settings._maxResponseDrainTime;
 
                            if (drainTime == TimeSpan.Zero)
                            {
                                return false;
                            }
 
                            if (drainTime != Timeout.InfiniteTimeSpan)
                            {
                                cts = new CancellationTokenSource((int)drainTime.TotalMilliseconds);
                                ctr = cts.Token.Register(static s => ((HttpConnection)s!).Dispose(), _connection);
                            }
                        }
 
                        await FillAsync().ConfigureAwait(false);
                    }
                }
                finally
                {
                    ctr.Dispose();
                    cts?.Dispose();
                }
            }
 
            private void Fill()
            {
                Debug.Assert(_connection is not null);
                ValueTask fillTask = _state == ParsingState.ConsumeTrailers
                    ? _connection.FillForHeadersAsync(async: false)
                    : _connection.FillAsync(async: false);
                Debug.Assert(fillTask.IsCompleted);
                fillTask.GetAwaiter().GetResult();
            }
 
            private ValueTask FillAsync()
            {
                Debug.Assert(_connection is not null);
                return _state == ParsingState.ConsumeTrailers
                    ? _connection.FillForHeadersAsync(async: true)
                    : _connection.FillAsync(async: true);
            }
        }
    }
}