File: MessagePump.cs
Web Access
Project: src\src\Servers\HttpSys\src\Microsoft.AspNetCore.Server.HttpSys.csproj (Microsoft.AspNetCore.Server.HttpSys)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Linq;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.HttpSys.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
 
namespace Microsoft.AspNetCore.Server.HttpSys;
 
internal sealed partial class MessagePump : IServer, IServerDelegationFeature
{
    private readonly ILogger _logger;
    private readonly HttpSysOptions _options;
 
    private readonly int _maxAccepts;
 
    private volatile int _stopping;
    private int _outstandingRequests;
    private readonly TaskCompletionSource _shutdownSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
    private int _shutdownSignalCompleted;
 
    private readonly ServerAddressesFeature _serverAddresses;
 
    public MessagePump(IOptions<HttpSysOptions> options, ILoggerFactory loggerFactory, IAuthenticationSchemeProvider authentication)
    {
        ArgumentNullException.ThrowIfNull(options);
        ArgumentNullException.ThrowIfNull(loggerFactory);
        _options = options.Value;
        Listener = new HttpSysListener(_options, loggerFactory);
        _logger = loggerFactory.CreateLogger<MessagePump>();
 
        if (_options.Authentication.Schemes != AuthenticationSchemes.None)
        {
            authentication.AddScheme(new AuthenticationScheme(HttpSysDefaults.AuthenticationScheme, displayName: _options.Authentication.AuthenticationDisplayName, handlerType: typeof(AuthenticationHandler)));
        }
 
        Features = new FeatureCollection();
        _serverAddresses = new ServerAddressesFeature();
        Features.Set<IServerAddressesFeature>(_serverAddresses);
 
        if (HttpApi.SupportsDelegation)
        {
            Features.Set<IServerDelegationFeature>(this);
        }
 
        _maxAccepts = _options.MaxAccepts;
    }
 
    internal HttpSysListener Listener { get; }
 
    internal IRequestContextFactory? RequestContextFactory { get; set; }
 
    public IFeatureCollection Features { get; }
 
    internal bool Stopping => _stopping == 1;
 
    public Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull
    {
        ArgumentNullException.ThrowIfNull(application);
 
        var hostingUrlsPresent = _serverAddresses.Addresses.Count > 0;
        var serverAddressCopy = _serverAddresses.Addresses.ToList();
        _serverAddresses.Addresses.Clear();
 
        if (_serverAddresses.PreferHostingUrls && hostingUrlsPresent)
        {
            if (_options.UrlPrefixes.Count > 0)
            {
                Log.ClearedPrefixes(_logger, _serverAddresses.Addresses);
 
                Listener.Options.UrlPrefixes.Clear();
            }
 
            UpdateUrlPrefixes(serverAddressCopy);
        }
        else if (_options.UrlPrefixes.Count > 0)
        {
            if (hostingUrlsPresent)
            {
                Log.ClearedAddresses(_logger, _serverAddresses.Addresses);
 
                _serverAddresses.Addresses.Clear();
            }
        }
        else if (hostingUrlsPresent)
        {
            UpdateUrlPrefixes(serverAddressCopy);
        }
        else if (Listener.RequestQueue.Created)
        {
            Log.BindingToDefault(_logger);
 
            Listener.Options.UrlPrefixes.Add(Constants.DefaultServerAddress);
        }
        // else // Attaching to an existing queue, don't add a default.
 
        // Can't start twice
        Debug.Assert(RequestContextFactory == null, "Start called twice!");
 
        Debug.Assert(application != null);
 
        RequestContextFactory = new ApplicationRequestContextFactory<TContext>(application, this);
 
        Listener.Start();
 
        // Update server addresses after we start listening as port 0
        // needs to be selected at the point of binding.
        foreach (var prefix in _options.UrlPrefixes)
        {
            _serverAddresses.Addresses.Add(prefix.FullPrefix);
        }
 
        // Dispatch to get off the SynchronizationContext and use UnsafeQueueUserWorkItem to avoid capturing the ExecutionContext
        ThreadPool.UnsafeQueueUserWorkItem(state => state.ActivateRequestProcessingLimits(), this, preferLocal: false);
 
        return Task.CompletedTask;
    }
 
    private void ActivateRequestProcessingLimits()
    {
        for (var i = 0; i < _maxAccepts; i++)
        {
            ProcessRequestsWorker();
        }
    }
 
    private void UpdateUrlPrefixes(IList<string> serverAddressCopy)
    {
        foreach (var value in serverAddressCopy)
        {
            Listener.Options.UrlPrefixes.Add(value);
        }
    }
 
    internal int IncrementOutstandingRequest()
    {
        return Interlocked.Increment(ref _outstandingRequests);
    }
 
    internal int DecrementOutstandingRequest()
    {
        return Interlocked.Decrement(ref _outstandingRequests);
    }
 
    internal void SetShutdownSignal()
    {
        _shutdownSignal.TrySetResult();
    }
 
    // The message pump.
    // When we start listening for the next request on one thread, we may need to be sure that the
    // completion continues on another thread as to not block the current request processing.
    // The awaits will manage stack depth for us.
    private void ProcessRequestsWorker()
    {
        Debug.Assert(RequestContextFactory != null);
 
        // Allocate and accept context per loop and reuse it for all accepts
        var acceptContext = new AsyncAcceptContext(Listener, RequestContextFactory, _logger);
 
        var loop = new AcceptLoop(acceptContext, this);
 
        ThreadPool.UnsafeQueueUserWorkItem(loop, preferLocal: false);
    }
 
    public Task StopAsync(CancellationToken cancellationToken)
    {
        void RegisterCancelation()
        {
            cancellationToken.Register(() =>
            {
                if (Interlocked.Exchange(ref _shutdownSignalCompleted, 1) == 0)
                {
                    Log.StopCancelled(_logger, _outstandingRequests);
                    _shutdownSignal.TrySetResult();
                }
            });
        }
 
        if (Interlocked.Exchange(ref _stopping, 1) == 1)
        {
            RegisterCancelation();
 
            return _shutdownSignal.Task;
        }
 
        try
        {
            // Wait for active requests to drain
            if (_outstandingRequests > 0)
            {
                Log.WaitingForRequestsToDrain(_logger, _outstandingRequests);
                RegisterCancelation();
            }
            else
            {
                _shutdownSignal.TrySetResult();
            }
        }
        catch (Exception ex)
        {
            _shutdownSignal.TrySetException(ex);
        }
 
        return _shutdownSignal.Task;
    }
 
    public DelegationRule CreateDelegationRule(string queueName, string uri)
    {
        var rule = new DelegationRule(Listener.UrlGroup, queueName, uri, _logger);
        Listener.UrlGroup.SetDelegationProperty(rule.Queue);
        return rule;
    }
 
    public void Dispose()
    {
        _stopping = 1;
        _shutdownSignal.TrySetResult();
 
        Listener.Dispose();
    }
 
    private sealed class AcceptLoop : IThreadPoolWorkItem
    {
        private readonly AsyncAcceptContext _asyncAcceptContext;
        private readonly MessagePump _messagePump;
        private readonly bool _preferInlineScheduling;
 
        public AcceptLoop(AsyncAcceptContext asyncAcceptContext,
                          MessagePump messagePump)
        {
            _asyncAcceptContext = asyncAcceptContext;
            _messagePump = messagePump;
            _preferInlineScheduling = _messagePump._options.UnsafePreferInlineScheduling;
        }
 
        public void Execute()
        {
            _ = ExecuteAsync();
        }
 
        private async Task ExecuteAsync()
        {
            while (!_messagePump.Stopping)
            {
                // Receive a request
                RequestContext requestContext;
                try
                {
                    requestContext = await _messagePump.Listener.AcceptAsync(_asyncAcceptContext);
 
                    if (!_messagePump.Listener.ValidateRequest(requestContext))
                    {
                        // Dispose the request
                        requestContext.ReleasePins();
                        requestContext.Dispose();
 
                        // If either of these is false then a response has already been sent to the client, so we can accept the next request
                        continue;
                    }
                }
                catch (Exception ex)
                {
                    Debug.Assert(_messagePump.Stopping);
                    if (_messagePump.Stopping)
                    {
                        Log.AcceptErrorStopping(_messagePump._logger, ex);
                    }
                    else
                    {
                        Log.AcceptError(_messagePump._logger, ex);
                    }
                    continue;
                }
 
                if (_preferInlineScheduling)
                {
                    try
                    {
                        await requestContext.ExecuteAsync();
                    }
                    catch (Exception ex)
                    {
                        // Request processing failed
                        // Log the error message, release throttle and move on
                        Log.RequestListenerProcessError(_messagePump._logger, ex);
                    }
                }
                else
                {
                    try
                    {
                        // Queue another accept before we execute the request
                        ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
 
                        // Use this thread to start the execution of the request (avoid the double threadpool dispatch)
                        await requestContext.ExecuteAsync();
                    }
                    catch (Exception ex)
                    {
                        // Request processing failed
                        // Log the error message, release throttle and move on
                        Log.RequestListenerProcessError(_messagePump._logger, ex);
                    }
 
                    // We're done with this thread, accept loop was continued via ThreadPool.UnsafeQueueUserWorkItem
                    return;
                }
            }
 
            _asyncAcceptContext.Dispose();
        }
    }
}