File: Internal\ConnectionDispatcher.cs
Web Access
Project: src\src\Servers\Kestrel\Core\src\Microsoft.AspNetCore.Server.Kestrel.Core.csproj (Microsoft.AspNetCore.Server.Kestrel.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
 
internal sealed class ConnectionDispatcher<T> where T : BaseConnectionContext
{
    private readonly ServiceContext _serviceContext;
    private readonly Func<T, Task> _connectionDelegate;
    private readonly TransportConnectionManager _transportConnectionManager;
    private readonly TaskCompletionSource _acceptLoopTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
    public ConnectionDispatcher(ServiceContext serviceContext, Func<T, Task> connectionDelegate, TransportConnectionManager transportConnectionManager)
    {
        _serviceContext = serviceContext;
        _connectionDelegate = connectionDelegate;
        _transportConnectionManager = transportConnectionManager;
    }
 
    private KestrelTrace Log => _serviceContext.Log;
    private KestrelMetrics Metrics => _serviceContext.Metrics;
 
    public Task StartAcceptingConnections(IConnectionListener<T> listener)
    {
        ThreadPool.UnsafeQueueUserWorkItem(StartAcceptingConnectionsCore, listener, preferLocal: false);
        return _acceptLoopTcs.Task;
    }
 
    private void StartAcceptingConnectionsCore(IConnectionListener<T> listener)
    {
        // REVIEW: Multiple accept loops in parallel?
        _ = AcceptConnectionsAsync();
 
        async Task AcceptConnectionsAsync()
        {
            try
            {
                while (true)
                {
                    var connection = await listener.AcceptAsync();
 
                    if (connection == null)
                    {
                        // We're done listening
                        break;
                    }
 
                    // Add the connection to the connection manager before we queue it for execution
                    var id = _transportConnectionManager.GetNewConnectionId();
 
                    // Cache counter enabled state at the start of the connection.
                    // This ensures that the state is consistent for the entire connection.
                    var metricsContext = Metrics.CreateContext(connection);
 
                    var kestrelConnection = new KestrelConnection<T>(
                        id, _serviceContext, _transportConnectionManager, _connectionDelegate, connection, Log, metricsContext);
 
                    _transportConnectionManager.AddConnection(id, kestrelConnection);
 
                    Log.ConnectionAccepted(connection.ConnectionId);
                    KestrelEventSource.Log.ConnectionQueuedStart(connection);
                    Metrics.ConnectionQueuedStart(metricsContext);
 
                    ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
                }
            }
            catch (Exception ex)
            {
                // REVIEW: If the accept loop ends should this trigger a server shutdown? It will manifest as a hang
                Log.LogCritical(0, ex, "The connection listener failed to accept any new connections.");
            }
            finally
            {
                _acceptLoopTcs.TrySetResult();
            }
        }
    }
}