File: System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionPool.Http1.cs
Web Access
Project: src\src\libraries\System.Net.Http\src\System.Net.Http.csproj (System.Net.Http)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Net.Http
{
    internal sealed partial class HttpConnectionPool
    {
        /// <summary>Stack of currently available HTTP/1.1 connections stored in the pool.</summary>
        private readonly ConcurrentStack<HttpConnection> _http11Connections = new();
        /// <summary>Controls whether we can use a fast path when returning connections to the pool and skip calling into <see cref="ProcessHttp11RequestQueue(HttpConnection?)"/>.</summary>
        private bool _http11RequestQueueIsEmptyAndNotDisposed;
        /// <summary>The maximum number of HTTP/1.1 connections allowed to be associated with the pool.</summary>
        private readonly int _maxHttp11Connections;
        /// <summary>The number of HTTP/1.1 connections associated with the pool, including in use, available, and pending.</summary>
        private int _associatedHttp11ConnectionCount;
        /// <summary>The number of HTTP/1.1 connections that are in the process of being established.</summary>
        private int _pendingHttp11ConnectionCount;
        /// <summary>Queue of requests waiting for an HTTP/1.1 connection.</summary>
        private RequestQueue<HttpConnection> _http11RequestQueue;
 
        /// <summary>For non-proxy connection pools, this is the host name in bytes; for proxies, null.</summary>
        private readonly byte[]? _hostHeaderLineBytes;
 
        public byte[]? HostHeaderLineBytes => _hostHeaderLineBytes;
 
        private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out HttpConnection? connection, [NotNullWhen(false)] out HttpConnectionWaiter<HttpConnection>? waiter)
        {
            while (_http11Connections.TryPop(out connection))
            {
                if (CheckExpirationOnGet(connection))
                {
                    if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired HTTP/1.1 connection in pool.");
                    connection.Dispose();
                    continue;
                }
 
                if (!connection.PrepareForReuse(async))
                {
                    if (NetEventSource.Log.IsEnabled()) connection.Trace("Found invalid HTTP/1.1 connection in pool.");
                    connection.Dispose();
                    continue;
                }
 
                if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/1.1 connection in pool.");
                waiter = null;
                return true;
            }
 
            // Slow path - no available connection found.
            // Push the request onto the request queue and check if we should inject a new connection.
 
            waiter = new HttpConnectionWaiter<HttpConnection>();
 
            // Technically this block under the lock could be a part of ProcessHttp11RequestQueue to avoid taking the lock twice.
            // It is kept separate to simplify that method (avoid extra arguments that are only relevant for this caller).
            lock (SyncObj)
            {
                _http11RequestQueue.EnqueueRequest(request, waiter);
 
                // Disable the fast path and force connections returned to the pool to check the request queue first.
                _http11RequestQueueIsEmptyAndNotDisposed = false;
            }
 
            // Other threads may have added a connection to the pool before we were able to
            // add the request to the queue, so we must check for an available connection again.
 
            ProcessHttp11RequestQueue(null);
            return false;
        }
 
        /// <summary>
        /// This method is called:
        /// <br/>- When returning a connection and observing that the request queue is not empty (<see cref="_http11RequestQueueIsEmptyAndNotDisposed"/> is <see langword="false"/>).
        /// <br/>- After adding a request to the queue if we fail to obtain a connection from <see cref="_http11Connections"/>.
        /// <br/>- After scavenging or disposing the pool to ensure that any pending requests are handled or connections disposed.
        /// <para>The method will attempt to match one request from the <see cref="_http11RequestQueue"/> to an available connection.
        /// The <paramref name="connection"/> can either be provided as an argument (when returning a connection to the pool), or one will be rented from <see cref="_http11Connections"/>.
        /// As we'll only process a single request, we are expecting the method to be called every time a request is enqueued, and every time a connection is returned while the request queue is not empty.</para>
        /// <para>If the <see cref="_http11RequestQueue"/> becomes empty, this method will reset the <see cref="_http11RequestQueueIsEmptyAndNotDisposed"/> flag back to <see langword="true"/>,
        /// such that returning connections will use the fast path again and skip calling into this method.</para>
        /// <para>Notably, this method will not be called on the fast path as long as we have enough connections to handle all new requests.</para>
        /// </summary>
        /// <param name="connection">The connection to use for a pending request, or return to the pool.</param>
        private void ProcessHttp11RequestQueue(HttpConnection? connection)
        {
            // Loop in case the request we try to signal was already cancelled or handled by a different connection.
            while (true)
            {
                HttpConnectionWaiter<HttpConnection>? waiter = null;
 
                lock (SyncObj)
                {
#if DEBUG
                    // Other threads may still interact with the connections stack. Read the count once to keep the assert message accurate.
                    int connectionCount = _http11Connections.Count;
                    Debug.Assert(_associatedHttp11ConnectionCount >= connectionCount + _pendingHttp11ConnectionCount,
                        $"Expected {_associatedHttp11ConnectionCount} >= {connectionCount} + {_pendingHttp11ConnectionCount}");
#endif
                    Debug.Assert(_associatedHttp11ConnectionCount <= _maxHttp11Connections,
                        $"Expected {_associatedHttp11ConnectionCount} <= {_maxHttp11Connections}");
                    Debug.Assert(_associatedHttp11ConnectionCount >= _pendingHttp11ConnectionCount,
                        $"Expected {_associatedHttp11ConnectionCount} >= {_pendingHttp11ConnectionCount}");
 
                    if (_http11RequestQueue.Count != 0)
                    {
                        if (connection is not null || _http11Connections.TryPop(out connection))
                        {
                            // If the connection is new, this check will always succeed as there is no scavenging task pending.
                            if (!connection.TryOwnScavengingTaskCompletion())
                            {
                                goto DisposeConnection;
                            }
 
                            // TryDequeueWaiter will prune completed requests from the head of the queue,
                            // so it's possible for it to return false even though we checked that Count != 0.
                            bool success = _http11RequestQueue.TryDequeueWaiter(this, out waiter);
                            Debug.Assert(success == waiter is not null);
                        }
                    }
 
                    // Update the empty queue flag now.
                    // If the request queue is now empty, returning connections will use the fast path and skip calling into this method.
                    _http11RequestQueueIsEmptyAndNotDisposed = _http11RequestQueue.Count == 0 && !_disposed;
 
                    if (waiter is null)
                    {
                        // We didn't find a waiter to signal, or there were no connections available.
 
                        if (connection is not null)
                        {
                            // A connection was provided to this method, or we rented one from the pool.
                            // Return it back to the pool since we're not going to use it yet.
 
                            // We're returning it while holding the lock to avoid a scenario where
                            // - thread A sees no requests are waiting in the queue (current thread)
                            // - thread B adds a request to the queue, and sees no connections are available
                            // - thread A returns the connection to the pool
                            // We'd have both a connection and a request waiting in the pool, but nothing to pair the two.
 
                            // The main scenario where we'll reach this branch is when we enqueue a request to the queue
                            // and set the _http11RequestQueueIsEmptyAndNotDisposed flag to false, followed by multiple
                            // returning connections observing the flag and calling into this method before we clear the flag.
                            // This should be a relatively rare case, so the added contention should be minimal.
 
                            // We took ownership of the scavenging task completion.
                            // If we can't return the completion (the task already completed), we must dispose the connection.
                            if (!connection.TryReturnScavengingTaskCompletionOwnership())
                            {
                                goto DisposeConnection;
                            }
 
                            _http11Connections.Push(connection);
                        }
                        else
                        {
                            // We may be out of available connections, check if we should inject a new one.
                            CheckForHttp11ConnectionInjection();
                        }
 
                        break;
                    }
                }
 
                Debug.Assert(connection is not null);
 
                if (waiter.TrySignal(connection))
                {
                    // Success. Note that we did not call connection.PrepareForReuse
                    // before signaling the waiter. This is intentional, as the fact that
                    // this method was called indicates that the connection is either new,
                    // or was just returned to the pool and is still in a good state.
                    //
                    // We must, however, take ownership of the scavenging task completion as
                    // there is a small chance that such a task was started if the connection
                    // was briefly returned to the pool.
                    return;
                }
 
                // The request was already cancelled or handled by a different connection.
 
                // We took ownership of the scavenging task completion.
                // If we can't return the completion (the task already completed), we must dispose the connection.
                if (!connection.TryReturnScavengingTaskCompletionOwnership())
                {
                    goto DisposeConnection;
                }
 
                // Loop again to try to find another request to signal, or return the connection.
                continue;
 
            DisposeConnection:
                // The scavenging task completed before we assigned a request to the connection.
                // We've received EOF/erroneous data and the connection is not usable anymore.
                // Throw it away and try again.
                connection.Dispose();
                connection = null;
            }
 
            if (_disposed)
            {
                // The pool is being disposed and there are no more requests to handle.
                // Clean up any idle connections still waiting in the pool.
                while (_http11Connections.TryPop(out connection))
                {
                    connection.Dispose();
                }
            }
        }
 
        private void CheckForHttp11ConnectionInjection()
        {
            Debug.Assert(HasSyncObjLock);
 
            _http11RequestQueue.PruneCompletedRequestsFromHeadOfQueue(this);
 
            // Determine if we can and should add a new connection to the pool.
            bool willInject =
                _http11RequestQueue.Count > _pendingHttp11ConnectionCount &&    // More requests queued than pending connections
                _associatedHttp11ConnectionCount < _maxHttp11Connections &&     // Under the connection limit
                _http11RequestQueue.RequestsWithoutAConnectionAttempt > 0;      // There are requests we haven't issued a connection attempt for
 
            if (NetEventSource.Log.IsEnabled())
            {
                Trace($"Available HTTP/1.1 connections: {_http11Connections.Count}, Requests in the queue: {_http11RequestQueue.Count}, " +
                    $"Requests without a connection attempt: {_http11RequestQueue.RequestsWithoutAConnectionAttempt}, " +
                    $"Pending HTTP/1.1 connections: {_pendingHttp11ConnectionCount}, Total associated HTTP/1.1 connections: {_associatedHttp11ConnectionCount}, " +
                    $"Max HTTP/1.1 connection limit: {_maxHttp11Connections}, " +
                    $"Will inject connection: {willInject}.");
            }
 
            if (willInject)
            {
                _associatedHttp11ConnectionCount++;
                _pendingHttp11ConnectionCount++;
 
                RequestQueue<HttpConnection>.QueueItem queueItem = _http11RequestQueue.PeekNextRequestForConnectionAttempt();
                _ = InjectNewHttp11ConnectionAsync(queueItem); // ignore returned task
            }
        }
 
        private async Task InjectNewHttp11ConnectionAsync(RequestQueue<HttpConnection>.QueueItem queueItem)
        {
            if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/1.1 connection for pool.");
 
            // Queue the remainder of the work so that this method completes quickly
            // and escapes locks held by the caller.
            await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
 
            HttpConnectionWaiter<HttpConnection> waiter = queueItem.Waiter;
            HttpConnection? connection = null;
            Exception? connectionException = null;
 
            CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource();
            waiter.ConnectionCancellationTokenSource = cts;
            try
            {
                connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false);
            }
            catch (Exception e)
            {
                connectionException = e is OperationCanceledException oce && oce.CancellationToken == cts.Token && !waiter.CancelledByOriginatingRequestCompletion ?
                    CreateConnectTimeoutException(oce) :
                    e;
            }
            finally
            {
                lock (waiter)
                {
                    waiter.ConnectionCancellationTokenSource = null;
                    cts.Dispose();
                }
            }
 
            if (connection is not null)
            {
                // Add the established connection to the pool.
                AddNewHttp11Connection(connection, queueItem.Waiter);
            }
            else
            {
                Debug.Assert(connectionException is not null);
                HandleHttp11ConnectionFailure(waiter, connectionException);
            }
        }
 
        internal async ValueTask<HttpConnection> CreateHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
        {
            (Stream stream, TransportContext? transportContext, Activity? activity,  IPEndPoint? remoteEndPoint) = await ConnectAsync(request, async, cancellationToken).ConfigureAwait(false);
            return await ConstructHttp11ConnectionAsync(async, stream, transportContext, request, activity, remoteEndPoint, cancellationToken).ConfigureAwait(false);
        }
 
        private async ValueTask<HttpConnection> ConstructHttp11ConnectionAsync(bool async, Stream stream, TransportContext? transportContext, HttpRequestMessage request, Activity? activity, IPEndPoint? remoteEndPoint, CancellationToken cancellationToken)
        {
            Stream newStream = await ApplyPlaintextFilterAsync(async, stream, HttpVersion.Version11, request, cancellationToken).ConfigureAwait(false);
            return new HttpConnection(this, newStream, transportContext, activity, remoteEndPoint);
        }
 
        private void HandleHttp11ConnectionFailure(HttpConnectionWaiter<HttpConnection>? requestWaiter, Exception e)
        {
            if (NetEventSource.Log.IsEnabled()) Trace($"HTTP/1.1 connection failed: {e}");
 
            // If this is happening as part of an HTTP/2 => HTTP/1.1 downgrade, we won't have an HTTP/1.1 waiter associated with this request
            // We don't care if this fails; that means the request was previously canceled or handled by a different connection.
            requestWaiter?.TrySetException(e);
 
            lock (SyncObj)
            {
                Debug.Assert(_associatedHttp11ConnectionCount > 0);
                Debug.Assert(_pendingHttp11ConnectionCount > 0);
 
                _associatedHttp11ConnectionCount--;
                _pendingHttp11ConnectionCount--;
 
                CheckForHttp11ConnectionInjection();
            }
        }
 
        public void RecycleHttp11Connection(HttpConnection connection)
        {
            if (CheckExpirationOnReturn(connection))
            {
                if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/1.1 connection when returning to pool. Connection lifetime expired.");
                connection.Dispose();
                return;
            }
 
            ReturnHttp11Connection(connection);
        }
 
        private void AddNewHttp11Connection(HttpConnection connection, HttpConnectionWaiter<HttpConnection>? initialRequestWaiter)
        {
            if (NetEventSource.Log.IsEnabled()) Trace("");
 
            lock (SyncObj)
            {
                Debug.Assert(_pendingHttp11ConnectionCount > 0);
                _pendingHttp11ConnectionCount--;
 
                if (initialRequestWaiter is not null)
                {
                    // If we're about to signal the initial waiter, that request must be removed from the queue if it was at the head to avoid rooting it forever.
                    // Normally, TryDequeueWaiter would handle the removal. TryDequeueSpecificWaiter matches this behavior for the initial request case.
                    // We don't care if this fails; that means the request was previously canceled, handled by a different connection, or not at the head of the queue.
                    _http11RequestQueue.TryDequeueSpecificWaiter(initialRequestWaiter);
 
                    // There's no need for us to hold the lock while signaling the waiter.
                }
            }
 
            if (initialRequestWaiter is not null &&
                initialRequestWaiter.TrySignal(connection))
            {
                return;
            }
 
            ReturnHttp11Connection(connection);
        }
 
        private void ReturnHttp11Connection(HttpConnection connection)
        {
            connection.MarkConnectionAsIdle();
 
            // The fast path when there are enough connections and no pending requests
            // is that we'll see _http11RequestQueueIsEmptyAndNotDisposed being true both
            // times, and all we'll have to do as part of returning the connection is
            // a Push call on the concurrent stack.
 
            if (Volatile.Read(ref _http11RequestQueueIsEmptyAndNotDisposed))
            {
                _http11Connections.Push(connection);
 
                // When we add a connection to the pool, we must ensure that there are
                // either no pending requests waiting, or that _something_ will pair those
                // requests with the connection we just added.
 
                // When adding a request to the queue, we'll first check if there's
                // an available connection waiting in the pool that we could use.
                // If there isn't, we'll set the _http11RequestQueueIsEmptyAndNotDisposed
                // flag and check for available connections again.
 
                // To avoid a race where we add the connection after a request was enqueued,
                // we'll check the flag again and try to process one request from the queue.
 
                if (!Volatile.Read(ref _http11RequestQueueIsEmptyAndNotDisposed))
                {
                    ProcessHttp11RequestQueue(null);
                }
            }
            else
            {
                // ProcessHttp11RequestQueue is responsible for handing the connection to a pending request,
                // or to return it back to the pool if there aren't any.
 
                // We hand over the connection directly instead of pushing it on the stack first to ensure
                // that pending requests are processed in a fair (FIFO) order.
                ProcessHttp11RequestQueue(connection);
            }
        }
 
        /// <summary>
        /// Called when an HttpConnection from this pool is no longer usable.
        /// Note, this is always called from HttpConnection.Dispose, which is a bit different than how HTTP2 works.
        /// </summary>
        public void InvalidateHttp11Connection(HttpConnection connection, bool disposing = true)
        {
            lock (SyncObj)
            {
                Debug.Assert(_associatedHttp11ConnectionCount > 0);
                Debug.Assert(!disposing || Array.IndexOf(_http11Connections.ToArray(), connection) < 0);
 
                _associatedHttp11ConnectionCount--;
 
                CheckForHttp11ConnectionInjection();
            }
        }
 
        private static void ScavengeHttp11ConnectionStack(HttpConnectionPool pool, ConcurrentStack<HttpConnection> connections, ref List<HttpConnectionBase>? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout)
        {
            // We can't simply enumerate the connections stack as other threads may still be adding and removing entries.
            // If we want to check the state of a connection, we must take it from the stack first to ensure we own it.
 
            // We're about to starve the connection pool of all available connections for a moment.
            // We must be holding the lock while doing so to ensure that any new requests that
            // come in during this time will be blocked waiting in ProcessHttp11RequestQueue.
            // If this were not the case, requests would repeatedly call into CheckForHttp11ConnectionInjection
            // and trigger new connection attempts, even if we have enough connections in our copy.
            Debug.Assert(pool.HasSyncObjLock);
            Debug.Assert(connections.Count <= pool._associatedHttp11ConnectionCount);
 
            HttpConnection[] stackCopy = ArrayPool<HttpConnection>.Shared.Rent(pool._associatedHttp11ConnectionCount);
            int usableConnections = 0;
 
            while (connections.TryPop(out HttpConnection? connection))
            {
                if (connection.IsUsable(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
                {
                    stackCopy[usableConnections++] = connection;
                }
                else
                {
                    toDispose ??= new List<HttpConnectionBase>();
                    toDispose.Add(connection);
                }
            }
 
            if (usableConnections > 0)
            {
                // Add them back in reverse to maintain the LIFO order.
                Span<HttpConnection> usable = stackCopy.AsSpan(0, usableConnections);
                usable.Reverse();
                connections.PushRange(stackCopy, 0, usableConnections);
                usable.Clear();
            }
 
            ArrayPool<HttpConnection>.Shared.Return(stackCopy);
        }
    }
}