File: Internal\Http\Http1ContentLengthMessageBody.cs
Web Access
Project: src\src\Servers\Kestrel\Core\src\Microsoft.AspNetCore.Server.Kestrel.Core.csproj (Microsoft.AspNetCore.Server.Kestrel.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.Globalization;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.Connections;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
 
internal sealed class Http1ContentLengthMessageBody : Http1MessageBody
{
    private ReadResult _readResult;
    private readonly long _contentLength;
    private long _unexaminedInputLength;
    private bool _readCompleted;
    private bool _isReading;
    private int _userCanceled;
    private bool _finalAdvanceCalled;
    private bool _cannotResetInputPipe;
 
    public Http1ContentLengthMessageBody(Http1Connection context, long contentLength, bool keepAlive)
        : base(context, keepAlive)
    {
        _contentLength = contentLength;
        _unexaminedInputLength = contentLength;
    }
 
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
    public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
    {
        VerifyIsNotReading();
 
        if (_readCompleted)
        {
            _isReading = true;
            return new ReadResult(_readResult.Buffer, Interlocked.Exchange(ref _userCanceled, 0) == 1, isCompleted: true);
        }
 
        // The issue is that TryRead can get a canceled read result
        // which is unknown to StartTimingReadAsync.
        if (_context.RequestTimedOut)
        {
            KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
        }
 
        await TryStartAsync();
 
        // The while(true) loop is required because the Http1 connection calls CancelPendingRead to unblock
        // the call to StartTimingReadAsync to check if the request timed out.
        // However, if the user called CancelPendingRead, we want that to return a canceled ReadResult
        // We internally track an int for that.
        while (true)
        {
            try
            {
                var readAwaitable = _context.Input.ReadAsync(cancellationToken);
 
                _isReading = true;
                _readResult = await StartTimingReadAsync(readAwaitable, cancellationToken);
            }
            catch (ConnectionAbortedException ex)
            {
                _isReading = false;
                throw new TaskCanceledException("The request was aborted", ex);
            }
 
            void ResetReadingState()
            {
                // Reset the timing read here for the next call to read.
                StopTimingRead(0);
 
                if (!_cannotResetInputPipe)
                {
                    _isReading = false;
                    _context.Input.AdvanceTo(_readResult.Buffer.Start);
                }
            }
 
            if (_context.RequestTimedOut)
            {
                ResetReadingState();
                KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
            }
 
            if (_readResult.IsCompleted)
            {
                ResetReadingState();
                ThrowUnexpectedEndOfRequestContent();
            }
 
            // Ignore the canceled readResult if it wasn't canceled by the user.
            // Normally we do not return a canceled ReadResult unless CancelPendingRead was called on the request body PipeReader itself,
            // but if the last call to AdvanceTo examined data it did not consume, we cannot reset the state of the Input pipe.
            // https://github.com/dotnet/aspnetcore/issues/19476
            if (!_readResult.IsCanceled || Interlocked.Exchange(ref _userCanceled, 0) == 1 || _cannotResetInputPipe)
            {
                var returnedReadResultLength = CreateReadResultFromConnectionReadResult();
 
                // Don't count bytes belonging to the next request, since read rate timeouts are done on a per-request basis.
                StopTimingRead(returnedReadResultLength);
 
                if (_readResult.IsCompleted)
                {
                    TryStop();
                }
 
                break;
            }
 
            ResetReadingState();
        }
 
        return _readResult;
    }
 
    public override bool TryReadInternal(out ReadResult readResult)
    {
        VerifyIsNotReading();
 
        if (_readCompleted)
        {
            _isReading = true;
            readResult = new ReadResult(_readResult.Buffer, Interlocked.Exchange(ref _userCanceled, 0) == 1, isCompleted: true);
            return true;
        }
 
        if (_context.RequestTimedOut)
        {
            KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
        }
 
        TryStartAsync();
 
        // The while(true) because we don't want to return a canceled ReadResult if the user themselves didn't cancel it.
        while (true)
        {
            if (!_context.Input.TryRead(out _readResult))
            {
                readResult = default;
                return false;
            }
 
            if (!_readResult.IsCanceled || Interlocked.Exchange(ref _userCanceled, 0) == 1 || _cannotResetInputPipe)
            {
                break;
            }
 
            _context.Input.AdvanceTo(_readResult.Buffer.Start);
        }
 
        if (_readResult.IsCompleted)
        {
            if (_cannotResetInputPipe)
            {
                _isReading = true;
            }
            else
            {
                _context.Input.AdvanceTo(_readResult.Buffer.Start);
            }
 
            ThrowUnexpectedEndOfRequestContent();
        }
 
        var returnedReadResultLength = CreateReadResultFromConnectionReadResult();
 
        // Don't count bytes belonging to the next request, since read rate timeouts are done on a per-request basis.
        CountBytesRead(returnedReadResultLength);
 
        // Only set _isReading if we are returning true.
        _isReading = true;
        readResult = _readResult;
 
        if (readResult.IsCompleted)
        {
            TryStop();
        }
 
        return true;
    }
 
    private long CreateReadResultFromConnectionReadResult()
    {
        var initialLength = _readResult.Buffer.Length;
        var maxLength = _unexaminedInputLength + _examinedUnconsumedBytes;
 
        if (initialLength < maxLength)
        {
            return initialLength;
        }
 
        _readCompleted = true;
        _readResult = new ReadResult(
            _readResult.Buffer.Slice(0, maxLength),
            _readResult.IsCanceled,
            isCompleted: true);
 
        return maxLength;
    }
 
    public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
    {
        if (!_isReading)
        {
            throw new InvalidOperationException("No reading operation to complete.");
        }
 
        _isReading = false;
 
        // The current body is read, though there might be more bytes to read on the stream with pipelining
        if (_readCompleted)
        {
            var buffer = _readResult.Buffer.Slice(consumed, _readResult.Buffer.End);
 
            if (!_finalAdvanceCalled && buffer.IsEmpty)
            {
                // Don't reference the old buffer as it will be released by the pipe after calling AdvanceTo
                _readResult = new ReadResult(new ReadOnlySequence<byte>(), isCanceled: false, isCompleted: true);
 
                _context.Input.AdvanceTo(consumed);
                _finalAdvanceCalled = true;
                _context.OnTrailersComplete();
            }
            else
            {
                // If the old stored _readResult was canceled, it's already been observed. Do not store a canceled read result permanently.
                _readResult = new ReadResult(buffer, isCanceled: false, isCompleted: true);
            }
 
            return;
        }
 
        // If consumed != examined, we cannot reset _context.Input back to a non-reading state after the next call to ReadAsync
        // simply by calling _context.Input.AdvanceTo(_readResult.Buffer.Start) because the DefaultPipeReader will complain that
        // "The examined position cannot be less than the previously examined position."
        _cannotResetInputPipe = !consumed.Equals(examined);
        _unexaminedInputLength -= TrackConsumedAndExaminedBytes(_readResult, consumed, examined);
        _context.Input.AdvanceTo(consumed, examined);
    }
 
    protected override void OnReadStarting()
    {
        var maxRequestBodySize = _context.MaxRequestBodySize;
        if (_contentLength > maxRequestBodySize)
        {
            _context.DisableKeepAlive(ConnectionEndReason.MaxRequestBodySizeExceeded);
            KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTooLarge, maxRequestBodySize.GetValueOrDefault().ToString(CultureInfo.InvariantCulture));
        }
    }
 
    public override void CancelPendingRead()
    {
        Interlocked.Exchange(ref _userCanceled, 1);
        _context.Input.CancelPendingRead();
    }
 
    [StackTraceHidden]
    private void VerifyIsNotReading()
    {
        if (!_isReading)
        {
            return;
        }
 
        if (_cannotResetInputPipe)
        {
            if (_readResult.IsCompleted)
            {
                KestrelBadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
            }
 
            if (_context.RequestTimedOut)
            {
                KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
            }
        }
 
        throw new InvalidOperationException("Reading is already in progress.");
    }
}