File: System\Net\Http\SocketsHttpHandler\Http2Stream.cs
Web Access
Project: src\src\libraries\System.Net.Http\src\System.Net.Http.csproj (System.Net.Http)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http.Headers;
using System.Net.Http.HPack;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
 
namespace System.Net.Http
{
    internal sealed partial class Http2Connection
    {
        private sealed class Http2Stream : IValueTaskSource, IHttpStreamHeadersHandler, IHttpTrace
        {
            private const int InitialStreamBufferSize =
#if DEBUG
                10;
#else
                1024;
#endif
 
            private static ReadOnlySpan<byte> StatusHeaderName => ":status"u8;
 
            private readonly Http2Connection _connection;
            private readonly HttpRequestMessage _request;
            private HttpResponseMessage? _response;
            /// <summary>Stores any trailers received after returning the response content to the caller.</summary>
            private HttpResponseHeaders? _trailers;
 
            private MultiArrayBuffer _responseBuffer; // mutable struct, do not make this readonly
            private Http2StreamWindowManager _windowManager;
            private CreditWaiter? _creditWaiter;
            private int _availableCredit;
            private readonly object _creditSyncObject = new object(); // split from SyncObject to avoid lock ordering problems with Http2Connection.SyncObject
 
            private StreamCompletionState _requestCompletionState;
            private StreamCompletionState _responseCompletionState;
            private ResponseProtocolState _responseProtocolState;
            private bool _responseHeadersReceived;
 
            // If this is not null, then we have received a reset from the server
            // (i.e. RST_STREAM or general IO error processing the connection)
            private Exception? _resetException;
            private bool _canRetry;             // if _resetException != null, this indicates the stream was refused and so the request is retryable
 
            // This flag indicates that, per section 8.1 of the RFC, the server completed the response and then sent a RST_STREAM with error = NO_ERROR.
            // This is a signal to stop sending the request body, but the request is still considered successful.
            private bool _requestBodyAbandoned;
 
            /// <summary>
            /// The core logic for the IValueTaskSource implementation.
            ///
            /// Thread-safety:
            /// _waitSource is used to coordinate between a producer indicating that something is available to process (either the connection's event loop
            /// or a cancellation request) and a consumer doing that processing.  There must only ever be a single consumer, namely this stream reading
            /// data associated with the response.  Because there is only ever at most one consumer, producers can trust that if _hasWaiter is true,
            /// until the _waitSource is then set, no consumer will attempt to reset the _waitSource.  A producer must still take SyncObj in order to
            /// coordinate with other producers (e.g. a race between data arriving from the event loop and cancellation being requested), but while holding
            /// the lock it can check whether _hasWaiter is true, and if it is, set _hasWaiter to false, exit the lock, and then set the _waitSource. Another
            /// producer coming along will then see _hasWaiter as false and will not attempt to concurrently set _waitSource (which would violate _waitSource's
            /// thread-safety), and no other consumer could come along in the interim, because _hasWaiter being true means that a consumer is already waiting
            /// for _waitSource to be set, and legally there can only be one consumer.  Once this producer sets _waitSource, the consumer could quickly loop
            /// around to wait again, but invariants have all been maintained in the interim, and the consumer would need to take the SyncObj lock in order to
            /// Reset _waitSource.
            /// </summary>
            private ManualResetValueTaskSourceCore<bool> _waitSource = new ManualResetValueTaskSourceCore<bool> { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly
            /// <summary>Cancellation registration used to cancel the <see cref="_waitSource"/>.</summary>
            private CancellationTokenRegistration _waitSourceCancellation;
            /// <summary>
            /// Whether code has requested or is about to request a wait be performed and thus requires a call to SetResult to complete it.
            /// This is read and written while holding the lock so that most operations on _waitSource don't need to be.
            /// </summary>
            private bool _hasWaiter;
 
            private readonly CancellationTokenSource? _requestBodyCancellationSource;
 
            private readonly TaskCompletionSource<bool>? _expect100ContinueWaiter;
 
            private int _headerBudgetRemaining;
 
            private bool _sendRstOnResponseClose;
 
            public Http2Stream(HttpRequestMessage request, Http2Connection connection)
            {
                _request = request;
                _connection = connection;
 
                _requestCompletionState = StreamCompletionState.InProgress;
                _responseCompletionState = StreamCompletionState.InProgress;
 
                _responseProtocolState = ResponseProtocolState.ExpectingStatus;
 
                _responseBuffer = new MultiArrayBuffer(InitialStreamBufferSize);
 
                _windowManager = new Http2StreamWindowManager(connection, this);
 
                _headerBudgetRemaining = connection._pool.Settings.MaxResponseHeadersByteLength;
 
                // Extended connect requests will use the response content stream for bidirectional communication.
                // We will ignore any content set for such requests in SendRequestBodyAsync, as it has no defined semantics.
                if (_request.Content == null || _request.IsExtendedConnectRequest)
                {
                    _requestCompletionState = StreamCompletionState.Completed;
                    if (_request.IsExtendedConnectRequest)
                    {
                        _requestBodyCancellationSource = new CancellationTokenSource();
                    }
                }
                else
                {
                    // Create this here because it can be canceled before SendRequestBodyAsync is even called.
                    // To avoid race conditions that can result in this being disposed in response to a server reset
                    // and then used to issue cancellation, we simply avoid disposing it; that's fine as long as we don't
                    // construct this via CreateLinkedTokenSource, in which case disposal is necessary to avoid a potential
                    // leak.  If how this is constructed ever changes, we need to revisit disposing it, such as by
                    // using synchronization (e.g. using an Interlocked.Exchange to "consume" the _requestBodyCancellationSource
                    // for either disposal or issuing cancellation).
                    _requestBodyCancellationSource = new CancellationTokenSource();
 
                    if (_request.HasHeaders && _request.Headers.ExpectContinue == true)
                    {
                        // Create a TCS for handling Expect: 100-continue semantics. See WaitFor100ContinueAsync.
                        // Note we need to create this in the constructor, because we can receive a 100 Continue response at any time after the constructor finishes.
                        _expect100ContinueWaiter = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                    }
                }
 
                _response = new HttpResponseMessage()
                {
                    Version = HttpVersion.Version20,
                    RequestMessage = _request,
                    Content = new HttpConnectionResponseContent()
                };
            }
 
            private object SyncObject => this; // this isn't handed out to code that may lock on it
 
            public void Initialize(int streamId, int initialWindowSize)
            {
                StreamId = streamId;
                _availableCredit = initialWindowSize;
                if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(initialWindowSize)}={initialWindowSize}");
            }
 
            public int StreamId { get; private set; }
 
            public bool SendRequestFinished => _requestCompletionState != StreamCompletionState.InProgress;
 
            public bool ExpectResponseData => _responseProtocolState == ResponseProtocolState.ExpectingData;
 
            public Http2Connection Connection => _connection;
 
            public bool ConnectProtocolEstablished { get; private set; }
 
            public HttpResponseMessage GetAndClearResponse()
            {
                // Once SendAsync completes, the Http2Stream should no longer hold onto the response message.
                // Since the Http2Stream is rooted by the Http2Connection dictionary, doing so would prevent
                // the response stream from being collected and finalized if it were to be dropped without
                // being disposed first.
                Debug.Assert(_response != null);
                HttpResponseMessage r = _response;
                _response = null;
                return r;
            }
 
            public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
            {
                // Extended connect requests will use the response content stream for bidirectional communication.
                // Ignore any content set for such requests, as it has no defined semantics.
                if (_request.Content == null || _request.IsExtendedConnectRequest)
                {
                    Debug.Assert(_requestCompletionState == StreamCompletionState.Completed);
                    return;
                }
 
                if (NetEventSource.Log.IsEnabled()) Trace($"{_request.Content}");
                Debug.Assert(_requestBodyCancellationSource != null);
 
                // Cancel the request body sending if cancellation is requested on the supplied cancellation token.
                // Normally we might create a linked token, but once cancellation is requested, we can't recover anyway,
                // so it's fine to cancel the source representing the whole request body, and doing so allows us to avoid
                // creating another CTS instance and the associated nodes inside of it.  With this, cancellation will be
                // requested on _requestBodyCancellationSource when we need to cancel the request stream for any reason,
                // such as receiving an RST_STREAM or when the passed in token has cancellation requested. However, to
                // avoid unnecessarily registering with the cancellation token unless we have to, we wait to do so until
                // either we know we need to do a Expect: 100-continue send or until we know that the copying of our
                // content completed asynchronously.
                CancellationTokenRegistration linkedRegistration = default;
                bool sendRequestContent = true;
                try
                {
                    if (_expect100ContinueWaiter != null)
                    {
                        linkedRegistration = RegisterRequestBodyCancellation(cancellationToken);
                        sendRequestContent = await WaitFor100ContinueAsync(_requestBodyCancellationSource.Token).ConfigureAwait(false);
                    }
 
                    if (sendRequestContent)
                    {
                        using var writeStream = new Http2WriteStream(this, _request.Content.Headers.ContentLength.GetValueOrDefault(-1));
 
                        if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.RequestContentStart();
 
                        ValueTask vt = _request.Content.InternalCopyToAsync(writeStream, context: null, _requestBodyCancellationSource.Token);
                        if (vt.IsCompleted)
                        {
                            vt.GetAwaiter().GetResult();
                        }
                        else
                        {
                            if (linkedRegistration.Equals(default))
                            {
                                linkedRegistration = RegisterRequestBodyCancellation(cancellationToken);
                            }
 
                            await vt.ConfigureAwait(false);
                        }
 
                        if (writeStream.BytesWritten < writeStream.ContentLength)
                        {
                            // The number of bytes we actually sent doesn't match the advertised Content-Length
                            throw new HttpRequestException(SR.Format(SR.net_http_request_content_length_mismatch, writeStream.BytesWritten, writeStream.ContentLength));
                        }
 
                        if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.RequestContentStop(writeStream.BytesWritten);
                    }
 
                    if (NetEventSource.Log.IsEnabled()) Trace($"Finished sending request body.");
                }
                catch (Exception e)
                {
                    if (NetEventSource.Log.IsEnabled()) Trace($"Failed to send request body: {e}");
                    bool signalWaiter;
 
                    Debug.Assert(!Monitor.IsEntered(SyncObject));
                    lock (SyncObject)
                    {
                        Debug.Assert(_requestCompletionState == StreamCompletionState.InProgress, $"Request already completed with state={_requestCompletionState}");
 
                        if (_requestBodyAbandoned)
                        {
                            // See comments on _requestBodyAbandoned.
                            // In this case, the request is still considered successful and we do not want to send a RST_STREAM,
                            // and we also don't want to propagate any error to the caller, in particular for non-duplex scenarios.
                            Debug.Assert(_responseCompletionState == StreamCompletionState.Completed);
                            _requestCompletionState = StreamCompletionState.Completed;
                            Debug.Assert(!ConnectProtocolEstablished);
                            Complete();
                            return;
                        }
 
                        // This should not cause RST_STREAM to be sent because the request is still marked as in progress.
                        bool sendReset;
                        (signalWaiter, sendReset) = CancelResponseBody();
                        Debug.Assert(!sendReset);
 
                        _requestCompletionState = StreamCompletionState.Failed;
                        SendReset();
                        Debug.Assert(!ConnectProtocolEstablished);
                        Complete();
                    }
 
                    if (signalWaiter)
                    {
                        _waitSource.SetResult(true);
                    }
 
                    throw;
                }
                finally
                {
                    linkedRegistration.Dispose();
                }
 
                // New scope here to avoid variable name conflict on "sendReset"
                {
                    Debug.Assert(!Monitor.IsEntered(SyncObject));
                    bool sendReset = false;
                    lock (SyncObject)
                    {
                        Debug.Assert(_requestCompletionState == StreamCompletionState.InProgress, $"Request already completed with state={_requestCompletionState}");
                        _requestCompletionState = StreamCompletionState.Completed;
 
                        bool complete = false;
                        if (_responseCompletionState != StreamCompletionState.InProgress)
                        {
                            // Note, we can reach this point if the response stream failed but cancellation didn't propagate before we finished.
                            sendReset = _responseCompletionState == StreamCompletionState.Failed;
                            complete = true;
                        }
 
                        if (sendReset)
                        {
                            SendReset();
                        }
                        else if (!sendRequestContent)
                        {
                            // Request body hasn't been sent, so we need to notify the server that it won't
                            // get the body. However, we cannot do it right here because the server can
                            // reset the whole stream before we will have a chance to read the response body.
                            _sendRstOnResponseClose = true;
                        }
                        else
                        {
                            // Send EndStream asynchronously and without cancellation.
                            // If this fails, it means that the connection is aborting and we will be reset.
                            _connection.LogExceptions(_connection.SendEndStreamAsync(StreamId));
                        }
 
                        if (complete)
                        {
                            Debug.Assert(!ConnectProtocolEstablished);
                            Complete();
                        }
                    }
                }
            }
 
            // Delay sending request body if we sent Expect: 100-continue.
            // We can either get 100 response from server and send body
            // or we may exceed timeout and send request body anyway.
            // If we get response status >= 300, we will not send the request body.
            public async ValueTask<bool> WaitFor100ContinueAsync(CancellationToken cancellationToken)
            {
                Debug.Assert(_request?.Content != null);
                if (NetEventSource.Log.IsEnabled()) Trace($"Waiting to send request body content for 100-Continue.");
 
                // Use TCS created in constructor. It will complete when one of three things occurs:
                // 1. we receive the relevant response from the server.
                // 2. the timer fires before we receive the relevant response from the server.
                // 3. cancellation is requested before we receive the relevant response from the server.
                // We need to run the continuation asynchronously for cases 1 and 3 (for 1 so that we don't starve the body copy operation, and
                // for 3 so that we don't run a lot of work as part of code calling Cancel), so the TCS is created to run continuations asynchronously.
                // We await the created Timer's disposal so that we ensure any work associated with it has quiesced prior to this method
                // returning, just in case this object is pooled and potentially reused for another operation in the future.
                TaskCompletionSource<bool> waiter = _expect100ContinueWaiter!;
                using (cancellationToken.UnsafeRegister(static s => ((TaskCompletionSource<bool>)s!).TrySetResult(false), waiter))
                await using (new Timer(static s =>
                {
                    var thisRef = (Http2Stream)s!;
                    if (NetEventSource.Log.IsEnabled()) thisRef.Trace($"100-Continue timer expired.");
                    thisRef._expect100ContinueWaiter?.TrySetResult(true);
                }, this, _connection._pool.Settings._expect100ContinueTimeout, Timeout.InfiniteTimeSpan).ConfigureAwait(false))
                {
                    bool shouldSendContent = await waiter.Task.ConfigureAwait(false);
                    // By now, either we got a response from the server or the timer expired or cancellation was requested.
                    CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
                    return shouldSendContent;
                }
            }
 
            private void SendReset()
            {
                Debug.Assert(Monitor.IsEntered(SyncObject));
                Debug.Assert(_requestCompletionState != StreamCompletionState.InProgress);
                Debug.Assert(_responseCompletionState != StreamCompletionState.InProgress);
                Debug.Assert(_requestCompletionState == StreamCompletionState.Failed || _responseCompletionState == StreamCompletionState.Failed,
                    "Reset called but neither request nor response is failed");
 
                if (NetEventSource.Log.IsEnabled()) Trace($"Stream reset. Request={_requestCompletionState}, Response={_responseCompletionState}.");
 
                // Don't send a RST_STREAM if we've already received one from the server.
                if (_resetException == null)
                {
                    // If execution reached this line, it's guaranteed that
                    // _requestCompletionState == StreamCompletionState.Failed or _responseCompletionState == StreamCompletionState.Failed
                    _connection.LogExceptions(_connection.SendRstStreamAsync(StreamId, Http2ProtocolErrorCode.Cancel));
                }
            }
 
            private void Complete()
            {
                Debug.Assert(Monitor.IsEntered(SyncObject));
                Debug.Assert(_requestCompletionState != StreamCompletionState.InProgress);
                Debug.Assert(_responseCompletionState != StreamCompletionState.InProgress);
 
                if (NetEventSource.Log.IsEnabled()) Trace($"Stream complete. Request={_requestCompletionState}, Response={_responseCompletionState}.");
 
                _connection.RemoveStream(this);
 
                lock (_creditSyncObject)
                {
                    CreditWaiter? waiter = _creditWaiter;
                    if (waiter != null)
                    {
                        waiter.Dispose();
                        _creditWaiter = null;
                    }
                }
            }
 
            private void Cancel()
            {
                if (NetEventSource.Log.IsEnabled()) Trace("");
 
                CancellationTokenSource? requestBodyCancellationSource = null;
                bool signalWaiter = false;
                bool sendReset = false;
 
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    if (_requestCompletionState == StreamCompletionState.InProgress)
                    {
                        requestBodyCancellationSource = _requestBodyCancellationSource;
                        Debug.Assert(requestBodyCancellationSource != null);
                    }
 
                    (signalWaiter, sendReset) = CancelResponseBody();
                }
 
                // When cancellation propagates, SendRequestBodyAsync will set _requestCompletionState to Failed
                requestBodyCancellationSource?.Cancel();
 
                lock (SyncObject)
                {
                    if (sendReset)
                    {
                        SendReset();
 
                        // Extended CONNECT notes:
                        //
                        // To prevent from calling it *twice*, Extended CONNECT stream's Complete() is only
                        // called from CloseResponseBody(), as CloseResponseBody() is *always* called
                        // from Extended CONNECT stream's Dispose().
 
                        if (!ConnectProtocolEstablished)
                        {
                            Complete();
                        }
                    }
                }
 
                if (signalWaiter)
                {
                    _waitSource.SetResult(true);
                }
            }
 
            // Returns whether the waiter should be signalled or not.
            private (bool signalWaiter, bool sendReset) CancelResponseBody()
            {
                Debug.Assert(Monitor.IsEntered(SyncObject));
 
                bool sendReset = _sendRstOnResponseClose;
 
                if (_responseCompletionState == StreamCompletionState.InProgress)
                {
                    _responseCompletionState = StreamCompletionState.Failed;
                    if (_requestCompletionState != StreamCompletionState.InProgress)
                    {
                        sendReset = true;
                    }
                }
 
                // Discard any remaining buffered response data
                _responseBuffer.DiscardAll();
 
                _responseProtocolState = ResponseProtocolState.Aborted;
 
                bool signalWaiter = _hasWaiter;
                _hasWaiter = false;
 
                return (signalWaiter, sendReset);
            }
 
            public void OnWindowUpdate(int amount)
            {
                lock (_creditSyncObject)
                {
                    _availableCredit = checked(_availableCredit + amount);
                    if (_availableCredit > 0 && _creditWaiter != null)
                    {
                        int granted = Math.Min(_availableCredit, _creditWaiter.Amount);
                        if (_creditWaiter.TrySetResult(granted))
                        {
                            _availableCredit -= granted;
                        }
                    }
                }
            }
 
            private const int FirstHPackRequestPseudoHeaderId = 1;
            private const int LastHPackRequestPseudoHeaderId = 7;
            private const int FirstHPackStatusPseudoHeaderId = 8;
            private const int LastHPackStatusPseudoHeaderId = 14;
            private const int FirstHPackNormalHeaderId = 15;
            private const int LastHPackNormalHeaderId = 61;
 
            private static ReadOnlySpan<int> HpackStaticStatusCodeTable => [200, 204, 206, 304, 400, 404, 500];
 
            private static readonly (HeaderDescriptor descriptor, byte[] value)[] s_hpackStaticHeaderTable = new (HeaderDescriptor, byte[])[LastHPackNormalHeaderId - FirstHPackNormalHeaderId + 1]
            {
                (KnownHeaders.AcceptCharset.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.AcceptEncoding.Descriptor, "gzip, deflate"u8.ToArray()),
                (KnownHeaders.AcceptLanguage.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.AcceptRanges.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Accept.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.AccessControlAllowOrigin.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Age.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Allow.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Authorization.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.CacheControl.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentDisposition.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentEncoding.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentLanguage.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentLength.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentLocation.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentRange.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ContentType.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Cookie.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Date.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ETag.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Expect.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Expires.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.From.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Host.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.IfMatch.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.IfModifiedSince.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.IfNoneMatch.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.IfRange.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.IfUnmodifiedSince.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.LastModified.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Link.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Location.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.MaxForwards.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ProxyAuthenticate.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.ProxyAuthorization.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Range.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Referer.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Refresh.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.RetryAfter.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Server.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.SetCookie.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.StrictTransportSecurity.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.TransferEncoding.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.UserAgent.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Vary.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.Via.Descriptor, Array.Empty<byte>()),
                (KnownHeaders.WWWAuthenticate.Descriptor, Array.Empty<byte>()),
            };
 
            void IHttpStreamHeadersHandler.OnStaticIndexedHeader(int index)
            {
                Debug.Assert(index >= FirstHPackRequestPseudoHeaderId && index <= LastHPackNormalHeaderId);
 
                if (index <= LastHPackRequestPseudoHeaderId)
                {
                    if (NetEventSource.Log.IsEnabled()) Trace($"Invalid request pseudo-header ID {index}.");
                    throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response);
                }
                else if (index <= LastHPackStatusPseudoHeaderId)
                {
                    int statusCode = HpackStaticStatusCodeTable[index - FirstHPackStatusPseudoHeaderId];
 
                    OnStatus(statusCode);
                }
                else
                {
                    (HeaderDescriptor descriptor, byte[] value) = s_hpackStaticHeaderTable[index - FirstHPackNormalHeaderId];
 
                    OnHeader(descriptor, value);
                }
            }
 
            void IHttpStreamHeadersHandler.OnStaticIndexedHeader(int index, ReadOnlySpan<byte> value)
            {
                Debug.Assert(index >= FirstHPackRequestPseudoHeaderId && index <= LastHPackNormalHeaderId);
 
                if (index <= LastHPackRequestPseudoHeaderId)
                {
                    if (NetEventSource.Log.IsEnabled()) Trace($"Invalid request pseudo-header ID {index}.");
                    throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response);
                }
                else if (index <= LastHPackStatusPseudoHeaderId)
                {
                    int statusCode = ParseStatusCode(value);
 
                    OnStatus(statusCode);
                }
                else
                {
                    (HeaderDescriptor descriptor, _) = s_hpackStaticHeaderTable[index - FirstHPackNormalHeaderId];
 
                    OnHeader(descriptor, value);
                }
            }
 
            void IHttpStreamHeadersHandler.OnDynamicIndexedHeader(int? index, ReadOnlySpan<byte> name, ReadOnlySpan<byte> value)
            {
                OnHeader(name, value);
            }
 
            private void AdjustHeaderBudget(int amount)
            {
                _headerBudgetRemaining -= amount;
                if (_headerBudgetRemaining < 0)
                {
                    throw new HttpRequestException(HttpRequestError.ConfigurationLimitExceeded, SR.Format(SR.net_http_response_headers_exceeded_length, _connection._pool.Settings.MaxResponseHeadersByteLength));
                }
            }
 
            private void OnStatus(int statusCode)
            {
                if (NetEventSource.Log.IsEnabled()) Trace($"Status code is {statusCode}");
 
                AdjustHeaderBudget(10); // for ":status" plus 3-digit status code
 
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    if (_responseProtocolState == ResponseProtocolState.Aborted)
                    {
                        // We could have aborted while processing the header block.
                        return;
                    }
 
                    if (_responseProtocolState == ResponseProtocolState.ExpectingHeaders)
                    {
                        if (NetEventSource.Log.IsEnabled()) Trace("Received extra status header.");
                        throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response_multiple_status_codes);
                    }
 
                    if (_responseProtocolState != ResponseProtocolState.ExpectingStatus)
                    {
                        // Pseudo-headers are allowed only in header block
                        if (NetEventSource.Log.IsEnabled()) Trace($"Status pseudo-header received in {_responseProtocolState} state.");
                        throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response_pseudo_header_in_trailer);
                    }
 
                    Debug.Assert(_response != null);
                    _response.StatusCode = (HttpStatusCode)statusCode;
 
                    if (statusCode < 200)
                    {
                        // We do not process headers from 1xx responses.
                        _responseProtocolState = ResponseProtocolState.ExpectingIgnoredHeaders;
 
                        if (_response.StatusCode == HttpStatusCode.Continue && _expect100ContinueWaiter != null)
                        {
                            if (NetEventSource.Log.IsEnabled()) Trace("Received 100-Continue status.");
                            _expect100ContinueWaiter.TrySetResult(true);
                        }
                    }
                    else
                    {
                        if (statusCode == 200 && _response.RequestMessage!.IsExtendedConnectRequest)
                        {
                            ConnectProtocolEstablished = true;
                        }
 
                        _responseProtocolState = ResponseProtocolState.ExpectingHeaders;
 
                        // If we are waiting for a 100-continue response, signal the waiter now.
                        if (_expect100ContinueWaiter != null)
                        {
                            // If the final status code is >= 300, skip sending the body.
                            bool shouldSendBody = (statusCode < 300);
 
                            if (NetEventSource.Log.IsEnabled()) Trace($"Expecting 100 Continue but received final status {statusCode}.");
                            _expect100ContinueWaiter.TrySetResult(shouldSendBody);
                        }
                    }
                }
            }
 
            private void OnHeader(HeaderDescriptor descriptor, ReadOnlySpan<byte> value)
            {
                if (NetEventSource.Log.IsEnabled()) Trace($"{descriptor.Name}: {Encoding.ASCII.GetString(value)}");
 
                AdjustHeaderBudget(descriptor.Name.Length + value.Length);
 
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    if (_responseProtocolState == ResponseProtocolState.Aborted)
                    {
                        // We could have aborted while processing the header block.
                        return;
                    }
 
                    if (_responseProtocolState == ResponseProtocolState.ExpectingIgnoredHeaders)
                    {
                        // for 1xx response we ignore all headers.
                        return;
                    }
 
                    if (_responseProtocolState != ResponseProtocolState.ExpectingHeaders && _responseProtocolState != ResponseProtocolState.ExpectingTrailingHeaders)
                    {
                        if (NetEventSource.Log.IsEnabled()) Trace("Received header before status.");
                        throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response);
                    }
 
                    Encoding? valueEncoding = _connection._pool.Settings._responseHeaderEncodingSelector?.Invoke(descriptor.Name, _request);
 
                    // Note we ignore the return value from TryAddWithoutValidation;
                    // if the header can't be added, we silently drop it.
                    if (_responseProtocolState == ResponseProtocolState.ExpectingTrailingHeaders)
                    {
                        Debug.Assert(_trailers != null);
                        string headerValue = descriptor.GetHeaderValue(value, valueEncoding);
                        _trailers.TryAddWithoutValidation((descriptor.HeaderType & HttpHeaderType.Request) == HttpHeaderType.Request ? descriptor.AsCustomHeader() : descriptor, headerValue);
                    }
                    else if ((descriptor.HeaderType & HttpHeaderType.Content) == HttpHeaderType.Content)
                    {
                        Debug.Assert(_response != null && _response.Content != null);
                        string headerValue = descriptor.GetHeaderValue(value, valueEncoding);
                        _response.Content.Headers.TryAddWithoutValidation(descriptor, headerValue);
                    }
                    else
                    {
                        Debug.Assert(_response != null);
                        string headerValue = _connection.GetResponseHeaderValueWithCaching(descriptor, value, valueEncoding);
                        _response.Headers.TryAddWithoutValidation((descriptor.HeaderType & HttpHeaderType.Request) == HttpHeaderType.Request ? descriptor.AsCustomHeader() : descriptor, headerValue);
                    }
                }
            }
 
            public void OnHeader(ReadOnlySpan<byte> name, ReadOnlySpan<byte> value)
            {
                Debug.Assert(name.Length > 0);
 
                if (name[0] == (byte)':')
                {
                    // Pseudo-header
                    if (name.SequenceEqual(StatusHeaderName))
                    {
                        int statusCode = ParseStatusCode(value);
 
                        OnStatus(statusCode);
                    }
                    else
                    {
                        if (NetEventSource.Log.IsEnabled()) Trace($"Invalid response pseudo-header '{Encoding.ASCII.GetString(name)}'.");
                        throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.net_http_invalid_response);
                    }
                }
                else
                {
                    // Regular header
                    if (!HeaderDescriptor.TryGet(name, out HeaderDescriptor descriptor))
                    {
                        // Invalid header name
                        throw new HttpRequestException(HttpRequestError.InvalidResponse, SR.Format(SR.net_http_invalid_response_header_name, Encoding.ASCII.GetString(name)));
                    }
 
                    OnHeader(descriptor, value);
                }
            }
 
            public void OnHeadersStart()
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    switch (_responseProtocolState)
                    {
                        case ResponseProtocolState.ExpectingStatus:
                        case ResponseProtocolState.Aborted:
                            break;
 
                        case ResponseProtocolState.ExpectingData:
                            _responseProtocolState = ResponseProtocolState.ExpectingTrailingHeaders;
                            _trailers ??= new HttpResponseHeaders(containsTrailingHeaders: true);
                            break;
 
                        default:
                            ThrowProtocolError();
                            break;
                    }
                }
            }
 
            public void OnHeadersComplete(bool endStream)
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                bool signalWaiter;
                lock (SyncObject)
                {
                    switch (_responseProtocolState)
                    {
                        case ResponseProtocolState.Aborted:
                            return;
 
                        case ResponseProtocolState.ExpectingHeaders:
                            _responseProtocolState = endStream ? ResponseProtocolState.Complete : ResponseProtocolState.ExpectingData;
                            _responseHeadersReceived = true;
                            break;
 
                        case ResponseProtocolState.ExpectingTrailingHeaders:
                            if (!endStream)
                            {
                                if (NetEventSource.Log.IsEnabled()) Trace("Trailing headers received without endStream");
                                ThrowProtocolError();
                            }
                            _responseProtocolState = ResponseProtocolState.Complete;
                            break;
 
                        case ResponseProtocolState.ExpectingIgnoredHeaders:
                            if (endStream)
                            {
                                // we should not get endStream while processing 1xx response.
                                ThrowProtocolError();
                            }
 
                            // We should wait for final response before signaling to waiter.
                            _responseProtocolState = ResponseProtocolState.ExpectingStatus;
                            return;
 
                        default:
                            ThrowProtocolError();
                            break;
                    }
 
                    if (endStream)
                    {
                        Debug.Assert(_responseCompletionState == StreamCompletionState.InProgress, $"Response already completed with state={_responseCompletionState}");
 
                        _responseCompletionState = StreamCompletionState.Completed;
 
                        // Extended CONNECT notes:
                        //
                        // To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
                        // called from CloseResponseBody(), as CloseResponseBody() is *only* called
                        // from Extended CONNECT stream's Dispose().
                        //
                        // Due to bidirectional streaming nature of the Extended CONNECT request,
                        // the *write side* of the stream can only be completed by calling Dispose().
                        //
                        // The streaming in both ways happens over the single "response" stream instance, which makes
                        // _requestCompletionState *not indicative* of the actual state of the write side of the stream.
 
                        if (_requestCompletionState == StreamCompletionState.Completed && !ConnectProtocolEstablished)
                        {
                            Complete();
                        }
 
                        // We should never reach here with the request failed. It's only set to Failed in SendRequestBodyAsync after we've called Cancel,
                        // which will set the _responseCompletionState to Failed, meaning we'll never get here.
                        Debug.Assert(_requestCompletionState != StreamCompletionState.Failed);
                    }
 
                    if (_responseProtocolState == ResponseProtocolState.ExpectingData)
                    {
                        _windowManager.Start();
                    }
                    signalWaiter = _hasWaiter;
                    _hasWaiter = false;
                }
 
                if (signalWaiter)
                {
                    _waitSource.SetResult(true);
                }
            }
 
            public void OnResponseData(ReadOnlySpan<byte> buffer, bool endStream)
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                bool signalWaiter;
                lock (SyncObject)
                {
                    switch (_responseProtocolState)
                    {
                        case ResponseProtocolState.ExpectingData:
                            break;
 
                        case ResponseProtocolState.Aborted:
                            return;
 
                        default:
                            // Flow control messages are not valid in this state.
                            ThrowProtocolError();
                            break;
                    }
 
                    if (_responseBuffer.ActiveMemory.Length + buffer.Length > _windowManager.StreamWindowSize)
                    {
                        // Window size exceeded.
                        ThrowProtocolError(Http2ProtocolErrorCode.FlowControlError);
                    }
 
                    _responseBuffer.EnsureAvailableSpace(buffer.Length);
                    _responseBuffer.AvailableMemory.CopyFrom(buffer);
                    _responseBuffer.Commit(buffer.Length);
 
                    if (endStream)
                    {
                        _responseProtocolState = ResponseProtocolState.Complete;
 
                        Debug.Assert(_responseCompletionState == StreamCompletionState.InProgress, $"Response already completed with state={_responseCompletionState}");
 
                        _responseCompletionState = StreamCompletionState.Completed;
 
                        // Extended CONNECT notes:
                        //
                        // To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
                        // called from CloseResponseBody(), as CloseResponseBody() is *only* called
                        // from Extended CONNECT stream's Dispose().
                        //
                        // Due to bidirectional streaming nature of the Extended CONNECT request,
                        // the *write side* of the stream can only be completed by calling Dispose().
                        //
                        // The streaming in both ways happens over the single "response" stream instance, which makes
                        // _requestCompletionState *not indicative* of the actual state of the write side of the stream.
 
                        if (_requestCompletionState == StreamCompletionState.Completed && !ConnectProtocolEstablished)
                        {
                            Complete();
                        }
 
                        // We should never reach here with the request failed. It's only set to Failed in SendRequestBodyAsync after we've called Cancel,
                        // which will set the _responseCompletionState to Failed, meaning we'll never get here.
                        Debug.Assert(_requestCompletionState != StreamCompletionState.Failed);
                    }
 
                    signalWaiter = _hasWaiter;
                    _hasWaiter = false;
                }
 
                if (signalWaiter)
                {
                    _waitSource.SetResult(true);
                }
            }
 
            // This is called in several different cases:
            // (1) Receiving RST_STREAM on this stream. If so, the resetStreamErrorCode will be non-null, and canRetry will be true only if the error code was REFUSED_STREAM.
            // (2) Receiving GOAWAY that indicates this stream has not been processed. If so, canRetry will be true.
            // (3) Connection IO failure or protocol violation. If so, resetException will contain the relevant exception and canRetry will be false.
            // (4) Receiving EOF from the server. If so, resetException will contain an exception like "expected 9 bytes of data", and canRetry will be false.
            public void OnReset(Exception resetException, Http2ProtocolErrorCode? resetStreamErrorCode = null, bool canRetry = false)
            {
                if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(resetException)}={resetException}, {nameof(resetStreamErrorCode)}={resetStreamErrorCode}");
 
                bool cancel = false;
                CancellationTokenSource? requestBodyCancellationSource = null;
 
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    // If we've already finished, don't actually reset the stream.
                    // Otherwise, any waiters that haven't executed yet will see the _resetException and throw.
                    // This can happen, for example, when the server finishes the request and then closes the connection,
                    // but the waiter hasn't woken up yet.
                    if (_requestCompletionState == StreamCompletionState.Completed && _responseCompletionState == StreamCompletionState.Completed)
                    {
                        return;
                    }
 
                    // It's possible we could be called twice, e.g. we receive a RST_STREAM and then the whole connection dies
                    // before we have a chance to process cancellation and tear everything down. Just ignore this.
                    if (_resetException != null)
                    {
                        return;
                    }
 
                    // If the server told us the request has not been processed (via Last-Stream-ID on GOAWAY),
                    // but we've already received some response data from the server, then the server lied to us.
                    // In this case, don't allow the request to be retried.
                    if (canRetry && _responseProtocolState != ResponseProtocolState.ExpectingStatus)
                    {
                        canRetry = false;
                    }
 
                    // Per section 8.1 in the RFC:
                    // If the server has completed the response body (i.e. we've received EndStream)
                    // but the request body is still sending, and we then receive a RST_STREAM with errorCode = NO_ERROR,
                    // we treat this specially and simply cancel sending the request body, rather than treating
                    // the entire request as failed.
                    if (resetStreamErrorCode == Http2ProtocolErrorCode.NoError &&
                        _responseCompletionState == StreamCompletionState.Completed)
                    {
                        if (_requestCompletionState == StreamCompletionState.InProgress)
                        {
                            _requestBodyAbandoned = true;
                            requestBodyCancellationSource = _requestBodyCancellationSource;
                            Debug.Assert(requestBodyCancellationSource != null);
                        }
                    }
                    else
                    {
                        _resetException = resetException;
                        _canRetry = canRetry;
                        cancel = true;
                    }
                }
 
                if (requestBodyCancellationSource != null)
                {
                    Debug.Assert(_requestBodyAbandoned);
                    Debug.Assert(!cancel);
                    requestBodyCancellationSource.Cancel();
                }
                else
                {
                    Cancel();
                }
            }
 
            private void CheckResponseBodyState()
            {
                Debug.Assert(Monitor.IsEntered(SyncObject));
 
                if (_resetException is Exception resetException)
                {
                    if (_canRetry)
                    {
                        ThrowRetry(SR.net_http_request_aborted, resetException);
                    }
 
                    ThrowRequestAborted(resetException);
                }
 
                if (_responseProtocolState == ResponseProtocolState.Aborted)
                {
                    ThrowRequestAborted();
                }
            }
 
            // Determine if we have enough data to process up to complete final response headers.
            private (bool wait, bool isEmptyResponse) TryEnsureHeaders()
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    if (!_responseHeadersReceived)
                    {
                        CheckResponseBodyState();
                        Debug.Assert(!_hasWaiter);
                        _hasWaiter = true;
                        _waitSource.Reset();
                        return (true, false);
                    }
 
                    return (false, _responseProtocolState == ResponseProtocolState.Complete && _responseBuffer.IsEmpty);
                }
            }
 
            public async Task ReadResponseHeadersAsync(CancellationToken cancellationToken)
            {
                bool emptyResponse;
                try
                {
                    if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.ResponseHeadersStart();
 
                    // Wait for response headers to be read.
                    bool wait;
 
                    // Process all informational responses if any and wait for final status.
                    (wait, emptyResponse) = TryEnsureHeaders();
                    if (wait)
                    {
                        await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
 
                        (wait, emptyResponse) = TryEnsureHeaders();
                        Debug.Assert(!wait);
                    }
 
                    Debug.Assert(_response is not null);
                    if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.ResponseHeadersStop((int)_response.StatusCode);
                }
                catch
                {
                    Cancel();
                    throw;
                }
 
                Debug.Assert(_response != null && _response.Content != null);
                // Start to process the response body.
                var responseContent = (HttpConnectionResponseContent)_response.Content;
                if (ConnectProtocolEstablished)
                {
                    responseContent.SetStream(new Http2ReadWriteStream(this, closeResponseBodyOnDispose: true));
                }
                else if (emptyResponse)
                {
                    // If there are any trailers, copy them over to the response.  Normally this would be handled by
                    // the response stream hitting EOF, but if there is no response body, we do it here.
                    MoveTrailersToResponseMessage(_response);
                    responseContent.SetStream(EmptyReadStream.Instance);
                }
                else
                {
                    responseContent.SetStream(new Http2ReadStream(this));
                }
                if (NetEventSource.Log.IsEnabled()) Trace($"Received response: {_response}");
 
                // Process Set-Cookie headers.
                if (_connection._pool.Settings._useCookies)
                {
                    CookieHelper.ProcessReceivedCookies(_response, _connection._pool.Settings._cookieContainer!);
                }
            }
 
            private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer, bool partOfSyncRead = false)
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    CheckResponseBodyState();
 
                    if (!_responseBuffer.IsEmpty)
                    {
                        MultiMemory activeBuffer = _responseBuffer.ActiveMemory;
                        int bytesRead = Math.Min(buffer.Length, activeBuffer.Length);
                        activeBuffer.Slice(0, bytesRead).CopyTo(buffer);
                        _responseBuffer.Discard(bytesRead);
 
                        return (false, bytesRead);
                    }
                    else if (_responseProtocolState == ResponseProtocolState.Complete)
                    {
                        return (false, 0);
                    }
 
                    Debug.Assert(_responseProtocolState == ResponseProtocolState.ExpectingData || _responseProtocolState == ResponseProtocolState.ExpectingTrailingHeaders);
 
                    Debug.Assert(!_hasWaiter);
                    _hasWaiter = true;
                    _waitSource.Reset();
                    _waitSource.RunContinuationsAsynchronously = !partOfSyncRead;
                    return (true, 0);
                }
            }
 
            public int ReadData(Span<byte> buffer, HttpResponseMessage responseMessage)
            {
                (bool wait, int bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
                if (wait)
                {
                    // Synchronously block waiting for data to be produced.
                    Debug.Assert(bytesRead == 0);
                    WaitForData();
                    (wait, bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
                    Debug.Assert(!wait);
                }
 
                if (bytesRead != 0)
                {
                    _windowManager.AdjustWindow(bytesRead, this);
                }
                else if (buffer.Length != 0)
                {
                    // We've hit EOF.  Pull in from the Http2Stream any trailers that were temporarily stored there.
                    MoveTrailersToResponseMessage(responseMessage);
                }
 
                return bytesRead;
            }
 
            public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, HttpResponseMessage responseMessage, CancellationToken cancellationToken)
            {
                (bool wait, int bytesRead) = TryReadFromBuffer(buffer.Span);
                if (wait)
                {
                    Debug.Assert(bytesRead == 0);
                    await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
                    (wait, bytesRead) = TryReadFromBuffer(buffer.Span);
                    Debug.Assert(!wait);
                }
 
                if (bytesRead != 0)
                {
                    _windowManager.AdjustWindow(bytesRead, this);
                }
                else if (buffer.Length != 0)
                {
                    // We've hit EOF.  Pull in from the Http2Stream any trailers that were temporarily stored there.
                    MoveTrailersToResponseMessage(responseMessage);
                }
 
                return bytesRead;
            }
 
            public void CopyTo(HttpResponseMessage responseMessage, Stream destination, int bufferSize)
            {
                byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
                try
                {
                    // Generally the same logic as in ReadData, but wrapped in a loop where every read segment is written to the destination.
                    while (true)
                    {
                        (bool wait, int bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
                        if (wait)
                        {
                            Debug.Assert(bytesRead == 0);
                            WaitForData();
                            (wait, bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
                            Debug.Assert(!wait);
                        }
 
                        if (bytesRead != 0)
                        {
                            _windowManager.AdjustWindow(bytesRead, this);
                            destination.Write(new ReadOnlySpan<byte>(buffer, 0, bytesRead));
                        }
                        else
                        {
                            // We've hit EOF.  Pull in from the Http2Stream any trailers that were temporarily stored there.
                            MoveTrailersToResponseMessage(responseMessage);
                            return;
                        }
                    }
                }
                finally
                {
                    ArrayPool<byte>.Shared.Return(buffer);
                }
            }
 
            public async Task CopyToAsync(HttpResponseMessage responseMessage, Stream destination, int bufferSize, CancellationToken cancellationToken)
            {
                byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
                try
                {
                    // Generally the same logic as in ReadDataAsync, but wrapped in a loop where every read segment is written to the destination.
                    while (true)
                    {
                        (bool wait, int bytesRead) = TryReadFromBuffer(buffer);
                        if (wait)
                        {
                            Debug.Assert(bytesRead == 0);
                            await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
                            (wait, bytesRead) = TryReadFromBuffer(buffer);
                            Debug.Assert(!wait);
                        }
 
                        if (bytesRead != 0)
                        {
                            _windowManager.AdjustWindow(bytesRead, this);
                            await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
                        }
                        else
                        {
                            // We've hit EOF.  Pull in from the Http2Stream any trailers that were temporarily stored there.
                            MoveTrailersToResponseMessage(responseMessage);
                            return;
                        }
                    }
                }
                finally
                {
                    ArrayPool<byte>.Shared.Return(buffer);
                }
            }
 
            private void MoveTrailersToResponseMessage(HttpResponseMessage responseMessage)
            {
                if (_trailers != null)
                {
                    responseMessage.StoreReceivedTrailingHeaders(_trailers);
                }
            }
 
            private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
            {
                Debug.Assert(_requestBodyCancellationSource != null);
 
                // Cancel the request body sending if cancellation is requested on the supplied cancellation token.
                CancellationTokenRegistration linkedRegistration = cancellationToken.CanBeCanceled && cancellationToken != _requestBodyCancellationSource.Token ?
                    RegisterRequestBodyCancellation(cancellationToken) :
                    default;
 
                try
                {
                    while (buffer.Length > 0)
                    {
                        int sendSize = -1;
                        bool flush = false;
                        lock (_creditSyncObject)
                        {
                            if (_availableCredit > 0)
                            {
                                sendSize = Math.Min(buffer.Length, _availableCredit);
                                _availableCredit -= sendSize;
 
                                // Force a flush if we are out of credit, because we don't know that we will be sending more data any time soon
                                if (_availableCredit == 0)
                                {
                                    flush = true;
                                }
                            }
                            else
                            {
                                if (_creditWaiter is null)
                                {
                                    _creditWaiter = new CreditWaiter(_requestBodyCancellationSource.Token);
                                }
                                else
                                {
                                    _creditWaiter.ResetForAwait(_requestBodyCancellationSource.Token);
                                }
                                _creditWaiter.Amount = buffer.Length;
                            }
                        }
 
                        if (sendSize == -1)
                        {
                            // Logically this is part of the else block above, but we can't await while holding the lock.
                            Debug.Assert(_creditWaiter != null);
                            sendSize = await _creditWaiter.AsValueTask().ConfigureAwait(false);
 
                            lock (_creditSyncObject)
                            {
                                // Force a flush if we are out of credit, because we don't know that we will be sending more data any time soon
                                if (_availableCredit == 0)
                                {
                                    flush = true;
                                }
                            }
                        }
 
                        Debug.Assert(sendSize > 0);
 
                        ReadOnlyMemory<byte> current;
                        (current, buffer) = SplitBuffer(buffer, sendSize);
 
                        await _connection.SendStreamDataAsync(StreamId, current, flush, _requestBodyCancellationSource.Token).ConfigureAwait(false);
                    }
                }
                catch (OperationCanceledException e) when (e.CancellationToken == _requestBodyCancellationSource.Token)
                {
                    lock (SyncObject)
                    {
                        if (_resetException is Exception resetException)
                        {
                            if (_canRetry)
                            {
                                ThrowRetry(SR.net_http_request_aborted, resetException);
                            }
 
                            ThrowRequestAborted(resetException);
                        }
                    }
 
                    throw;
                }
                finally
                {
                    linkedRegistration.Dispose();
                }
            }
 
            // This method should only be called from Http2ReadWriteStream.Dispose()
            private void CloseResponseBody()
            {
                // Extended CONNECT notes:
                //
                // Due to bidirectional streaming nature of the Extended CONNECT request,
                // the *write side* of the stream can only be completed by calling Dispose()
                // (which, for Extended CONNECT case, will in turn call CloseResponseBody())
                //
                // Similarly to QuicStream, disposal *gracefully* closes the write side of the stream
                // (unless we've received RST_STREAM before) and *abortively* closes the read side
                // of the stream (unless we've received EOS before).
 
                if (ConnectProtocolEstablished && _resetException is null)
                {
                    // Gracefully close the write side of the Extended CONNECT stream
                    _connection.LogExceptions(_connection.SendEndStreamAsync(StreamId));
                }
 
                // Check if the response body has been fully consumed.
                bool fullyConsumed = false;
                Debug.Assert(!Monitor.IsEntered(SyncObject));
                lock (SyncObject)
                {
                    if (_responseBuffer.IsEmpty && _responseProtocolState == ResponseProtocolState.Complete)
                    {
                        fullyConsumed = true;
                    }
                }
 
                // If the response body isn't completed, cancel it now.
                // This includes aborting the read side of the Extended CONNECT stream.
                if (!fullyConsumed)
                {
                    Cancel();
                }
                else if (_sendRstOnResponseClose)
                {
                    // Send RST_STREAM with CANCEL to notify the server that it shouldn't
                    // expect the request body.
                    // If this fails, it means that the connection is aborting and we will be reset.
                    _connection.LogExceptions(_connection.SendRstStreamAsync(StreamId, Http2ProtocolErrorCode.Cancel));
                }
 
                lock (SyncObject)
                {
                    if (ConnectProtocolEstablished)
                    {
                        // This should be the only place where Extended Connect stream is completed
                        Complete();
                    }
 
                    _responseBuffer.Dispose();
                }
            }
 
            private CancellationTokenRegistration RegisterRequestBodyCancellation(CancellationToken cancellationToken) =>
                cancellationToken.UnsafeRegister(static s => ((CancellationTokenSource)s!).Cancel(), _requestBodyCancellationSource);
 
            // This object is itself usable as a backing source for ValueTask.  Since there's only ever one awaiter
            // for this object's state transitions at a time, we allow the object to be awaited directly. All functionality
            // associated with the implementation is just delegated to the ManualResetValueTaskSourceCore.
            ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _waitSource.GetStatus(token);
            void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags);
            void IValueTaskSource.GetResult(short token)
            {
                Debug.Assert(!Monitor.IsEntered(SyncObject));
 
                // Clean up the registration.  It's important to Dispose rather than Unregister, so that we wait
                // for any in-flight cancellation to complete.
                _waitSourceCancellation.Dispose();
                _waitSourceCancellation = default;
 
                // Propagate any exceptions if there were any.
                _waitSource.GetResult(token);
            }
 
            private void WaitForData()
            {
                // See comments in WaitAsync.
                Debug.Assert(!_waitSource.RunContinuationsAsynchronously);
                new ValueTask(this, _waitSource.Version).AsTask().GetAwaiter().GetResult();
            }
 
            private ValueTask WaitForDataAsync(CancellationToken cancellationToken)
            {
                Debug.Assert(_waitSource.RunContinuationsAsynchronously);
 
                // No locking is required here to access _waitSource.  To be here, we've already updated _hasWaiter (while holding the lock)
                // to indicate that we would be creating this waiter, and at that point the only code that could be await'ing _waitSource or
                // Reset'ing it is this code here.  It's possible for this to race with the _waitSource being completed, but that's ok and is
                // handled by _waitSource as one of its primary purposes.  We can't assert _hasWaiter here, though, as once we released the
                // lock, a producer could have seen _hasWaiter as true and both set it to false and signaled _waitSource.
 
                // With HttpClient, the supplied cancellation token will always be cancelable, as HttpClient supplies a token that
                // will have cancellation requested if CancelPendingRequests is called (or when a non-infinite Timeout expires).
                // However, this could still be non-cancelable if HttpMessageInvoker was used, at which point this will only be
                // cancelable if the caller's token was cancelable.
 
                _waitSourceCancellation = cancellationToken.UnsafeRegister(static (s, cancellationToken) =>
                {
                    var thisRef = (Http2Stream)s!;
 
                    bool signalWaiter;
                    Debug.Assert(!Monitor.IsEntered(thisRef.SyncObject));
                    lock (thisRef.SyncObject)
                    {
                        signalWaiter = thisRef._hasWaiter;
                        thisRef._hasWaiter = false;
                    }
 
                    if (signalWaiter)
                    {
                        // Wake up the wait.  It will then immediately check whether cancellation was requested and throw if it was.
                        thisRef._waitSource.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(
                            CancellationHelper.CreateOperationCanceledException(null, cancellationToken)));
                    }
                }, this);
 
                return new ValueTask(this, _waitSource.Version);
            }
 
            public void Trace(string message, [CallerMemberName] string? memberName = null) =>
                _connection.Trace(StreamId, message, memberName);
 
            private enum ResponseProtocolState : byte
            {
                ExpectingStatus,
                ExpectingIgnoredHeaders,
                ExpectingHeaders,
                ExpectingData,
                ExpectingTrailingHeaders,
                Complete,
                Aborted
            }
 
            private enum StreamCompletionState : byte
            {
                InProgress,
                Completed,
                Failed
            }
 
            private sealed class Http2ReadStream : Http2ReadWriteStream
            {
                public Http2ReadStream(Http2Stream http2Stream) : base(http2Stream, closeResponseBodyOnDispose: true) { }
 
                public override bool CanWrite => false;
 
                public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream);
 
                public override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => ValueTask.FromException(new NotSupportedException(SR.net_http_content_readonly_stream));
            }
 
            private sealed class Http2WriteStream : Http2ReadWriteStream
            {
                public long BytesWritten { get; private set; }
 
                public long ContentLength { get; }
 
                public Http2WriteStream(Http2Stream http2Stream, long contentLength) : base(http2Stream)
                {
                    Debug.Assert(contentLength >= -1);
                    ContentLength = contentLength;
                }
 
                public override bool CanRead => false;
 
                public override int Read(Span<byte> buffer) => throw new NotSupportedException(SR.net_http_content_writeonly_stream);
 
                public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) => ValueTask.FromException<int>(new NotSupportedException(SR.net_http_content_writeonly_stream));
 
                public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException(SR.net_http_content_writeonly_stream);
 
                public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => Task.FromException(new NotSupportedException(SR.net_http_content_writeonly_stream));
 
                public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
                {
                    BytesWritten += buffer.Length;
 
                    if ((ulong)BytesWritten > (ulong)ContentLength) // If ContentLength == -1, this will always be false
                    {
                        return ValueTask.FromException(new HttpRequestException(SR.net_http_content_write_larger_than_content_length));
                    }
 
                    return base.WriteAsync(buffer, cancellationToken);
                }
            }
 
            public class Http2ReadWriteStream : HttpBaseStream
            {
                private Http2Stream? _http2Stream;
                private readonly HttpResponseMessage _responseMessage;
 
                public Http2ReadWriteStream(Http2Stream http2Stream, bool closeResponseBodyOnDispose = false)
                {
                    Debug.Assert(http2Stream != null);
                    Debug.Assert(http2Stream._response != null);
                    _http2Stream = http2Stream;
                    _responseMessage = _http2Stream._response;
                    CloseResponseBodyOnDispose = closeResponseBodyOnDispose;
                }
 
                ~Http2ReadWriteStream()
                {
                    if (NetEventSource.Log.IsEnabled()) _http2Stream?.Trace("");
                    try
                    {
                        Dispose(disposing: false);
                    }
                    catch (Exception e)
                    {
                        if (NetEventSource.Log.IsEnabled()) _http2Stream?.Trace($"Error: {e}");
                    }
                }
 
                protected bool CloseResponseBodyOnDispose { get; private init; }
 
                protected override void Dispose(bool disposing)
                {
                    Http2Stream? http2Stream = Interlocked.Exchange(ref _http2Stream, null);
                    if (http2Stream == null)
                    {
                        return;
                    }
 
                    // Technically we shouldn't be doing the following work when disposing == false,
                    // as the following work relies on other finalizable objects.  But given the HTTP/2
                    // protocol, we have little choice: if someone drops the Http2ReadStream without
                    // disposing of it, we need to a) signal to the server that the stream is being
                    // canceled, and b) clean up the associated state in the Http2Connection.
                    if (CloseResponseBodyOnDispose)
                    {
                        http2Stream.CloseResponseBody();
                    }
 
                    base.Dispose(disposing);
                }
 
                public override bool CanRead => _http2Stream != null;
                public override bool CanWrite => _http2Stream != null;
 
                public override int Read(Span<byte> destination)
                {
                    Http2Stream? http2Stream = _http2Stream;
                    ObjectDisposedException.ThrowIf(http2Stream is null, this);
 
                    return http2Stream.ReadData(destination, _responseMessage);
                }
 
                public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
                {
                    Http2Stream? http2Stream = _http2Stream;
 
                    if (http2Stream == null)
                    {
                        return ValueTask.FromException<int>(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Http2ReadStream))));
                    }
 
                    if (cancellationToken.IsCancellationRequested)
                    {
                        return ValueTask.FromCanceled<int>(cancellationToken);
                    }
 
                    return http2Stream.ReadDataAsync(destination, _responseMessage, cancellationToken);
                }
 
                public override void CopyTo(Stream destination, int bufferSize)
                {
                    ValidateCopyToArguments(destination, bufferSize);
                    Http2Stream http2Stream = _http2Stream ?? throw ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Http2ReadStream)));
                    http2Stream.CopyTo(_responseMessage, destination, bufferSize);
                }
 
                public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
                {
                    ValidateCopyToArguments(destination, bufferSize);
                    Http2Stream? http2Stream = _http2Stream;
                    return
                        http2Stream is null ? Task.FromException<int>(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Http2ReadStream)))) :
                        cancellationToken.IsCancellationRequested ? Task.FromCanceled<int>(cancellationToken) :
                        http2Stream.CopyToAsync(_responseMessage, destination, bufferSize, cancellationToken);
                }
 
                public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
                {
 
                    Http2Stream? http2Stream = _http2Stream;
 
                    if (http2Stream == null)
                    {
                        return ValueTask.FromException(new ObjectDisposedException(nameof(Http2WriteStream)));
                    }
 
                    return http2Stream.SendDataAsync(buffer, cancellationToken);
                }
 
                public override Task FlushAsync(CancellationToken cancellationToken)
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        return Task.FromCanceled(cancellationToken);
                    }
 
                    Http2Stream? http2Stream = _http2Stream;
 
                    if (http2Stream == null)
                    {
                        return Task.CompletedTask;
                    }
 
                    // In order to flush this stream's previous writes, we need to flush the connection. We
                    // really only need to do any work here if the connection's buffer has any pending writes
                    // from this stream, but we currently lack a good/efficient/safe way of doing that.
                    return http2Stream._connection.FlushAsync(cancellationToken);
                }
            }
        }
    }
}