File: Internal\SocketConnection.cs
Web Access
Project: src\src\Servers\Kestrel\Transport.Sockets\src\Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.csproj (Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
 
internal sealed partial class SocketConnection : TransportConnection
{
    private static readonly int MinAllocBufferSize = PinnedBlockMemoryPool.BlockSize / 2;
 
    private readonly Socket _socket;
    private readonly ILogger _logger;
    private readonly SocketReceiver _receiver;
    private SocketSender? _sender;
    private readonly SocketSenderPool _socketSenderPool;
    private readonly IDuplexPipe _originalTransport;
    private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
 
    private readonly Lock _shutdownLock = new();
    private volatile Exception? _shutdownReason;
    private Task? _sendingTask;
    private Task? _receivingTask;
    private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource();
    private bool _connectionClosed;
    private readonly bool _waitForData;
    private readonly bool _finOnError;
 
    internal SocketConnection(Socket socket,
                              MemoryPool<byte> memoryPool,
                              PipeScheduler socketScheduler,
                              ILogger logger,
                              SocketSenderPool socketSenderPool,
                              PipeOptions inputOptions,
                              PipeOptions outputOptions,
                              bool waitForData = true,
                              bool finOnError = false)
    {
        Debug.Assert(socket != null);
        Debug.Assert(memoryPool != null);
        Debug.Assert(logger != null);
 
        _socket = socket;
        MemoryPool = memoryPool;
        _logger = logger;
        _waitForData = waitForData;
        _socketSenderPool = socketSenderPool;
        _finOnError = finOnError;
 
        LocalEndPoint = _socket.LocalEndPoint;
        RemoteEndPoint = _socket.RemoteEndPoint;
 
        ConnectionClosed = _connectionClosedTokenSource.Token;
 
        _receiver = new SocketReceiver(socketScheduler);
 
        var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);
 
        // Set the transport and connection id
        Transport = _originalTransport = pair.Transport;
        Application = pair.Application;
 
        InitializeFeatures();
    }
 
    public PipeWriter Input => Application.Output;
 
    public PipeReader Output => Application.Input;
 
    public override MemoryPool<byte> MemoryPool { get; }
 
    public void Start()
    {
        try
        {
            // Spawn send and receive logic
            _receivingTask = DoReceive();
            _sendingTask = DoSend();
        }
        catch (Exception ex)
        {
            _logger.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
        }
    }
 
    public override void Abort(ConnectionAbortedException abortReason)
    {
        // Try to gracefully close the socket to match libuv behavior.
        Shutdown(abortReason);
 
        // Cancel ProcessSends loop after calling shutdown to ensure the correct _shutdownReason gets set.
        Output.CancelPendingRead();
    }
 
    // Only called after connection middleware is complete which means the ConnectionClosed token has fired.
    public override async ValueTask DisposeAsync()
    {
        _originalTransport.Input.Complete();
        _originalTransport.Output.Complete();
 
        try
        {
            // Now wait for both to complete
            if (_receivingTask != null)
            {
                await _receivingTask;
            }
 
            if (_sendingTask != null)
            {
                await _sendingTask;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
        }
        finally
        {
            _receiver.Dispose();
            _sender?.Dispose();
        }
 
        _connectionClosedTokenSource.Dispose();
    }
 
    private async Task DoReceive()
    {
        Exception? error = null;
 
        try
        {
            while (_shutdownReason is null)
            {
                if (_waitForData)
                {
                    // Wait for data before allocating a buffer.
                    var waitForDataResult = await _receiver.WaitForDataAsync(_socket);
 
                    if (!IsNormalCompletion(waitForDataResult))
                    {
                        break;
                    }
                }
 
                // Ensure we have some reasonable amount of buffer space
                var buffer = Input.GetMemory(MinAllocBufferSize);
 
                var receiveResult = await _receiver.ReceiveAsync(_socket, buffer);
 
                if (!IsNormalCompletion(receiveResult))
                {
                    break;
                }
 
                var bytesReceived = receiveResult.BytesTransferred;
 
                if (bytesReceived == 0)
                {
                    // FIN
                    SocketsLog.ConnectionReadFin(_logger, this);
                    break;
                }
 
                Input.Advance(bytesReceived);
 
                var flushTask = Input.FlushAsync();
 
                var paused = !flushTask.IsCompleted;
 
                if (paused)
                {
                    SocketsLog.ConnectionPause(_logger, this);
                }
 
                var result = await flushTask;
 
                if (paused)
                {
                    SocketsLog.ConnectionResume(_logger, this);
                }
 
                if (result.IsCompleted || result.IsCanceled)
                {
                    // Pipe consumer is shut down, do we stop writing
                    break;
                }
 
                bool IsNormalCompletion(SocketOperationResult result)
                {
                    // There's still a small chance that both DoReceive() and DoSend() can log the same connection reset.
                    // Both logs will have the same ConnectionId. I don't think it's worthwhile to lock just to avoid this.
                    // When _shutdownReason is set, error is ignored, so it does not need to be initialized.
                    if (_shutdownReason is not null)
                    {
                        return false;
                    }
 
                    if (!result.HasError)
                    {
                        return true;
                    }
 
                    if (IsConnectionResetError(result.SocketError.SocketErrorCode))
                    {
                        var ex = result.SocketError;
                        error = new ConnectionResetException(ex.Message, ex);
 
                        SocketsLog.ConnectionReset(_logger, this);
 
                        return false;
                    }
 
                    if (IsConnectionAbortError(result.SocketError.SocketErrorCode))
                    {
                        error = result.SocketError;
 
                        // This is unexpected if the socket hasn't been disposed yet.
                        SocketsLog.ConnectionError(_logger, this, error);
 
                        return false;
                    }
 
                    // This is unexpected.
                    error = result.SocketError;
                    SocketsLog.ConnectionError(_logger, this, error);
 
                    return false;
                }
            }
        }
        catch (ObjectDisposedException ex)
        {
            // This exception should always be ignored because _shutdownReason should be set.
            error = ex;
 
            if (_shutdownReason is not null)
            {
                // This is unexpected if the socket hasn't been disposed yet.
                SocketsLog.ConnectionError(_logger, this, error);
            }
        }
        catch (Exception ex)
        {
            // This is unexpected.
            error = ex;
            SocketsLog.ConnectionError(_logger, this, error);
        }
        finally
        {
            // If Shutdown() has already been called, assume that was the reason ProcessReceives() exited.
            Input.Complete(_shutdownReason ?? error);
 
            FireConnectionClosed();
 
            await _waitForConnectionClosedTcs.Task;
        }
    }
 
    private async Task DoSend()
    {
        Exception? shutdownReason = null;
        Exception? unexpectedError = null;
 
        try
        {
            while (true)
            {
                var result = await Output.ReadAsync();
 
                if (result.IsCanceled)
                {
                    break;
                }
                var buffer = result.Buffer;
 
                if (!buffer.IsEmpty)
                {
                    _sender = _socketSenderPool.Rent();
                    var transferResult = await _sender.SendAsync(_socket, buffer);
 
                    if (transferResult.HasError)
                    {
                        if (IsConnectionResetError(transferResult.SocketError.SocketErrorCode))
                        {
                            var ex = transferResult.SocketError;
                            shutdownReason = new ConnectionResetException(ex.Message, ex);
                            SocketsLog.ConnectionReset(_logger, this);
 
                            break;
                        }
 
                        if (IsConnectionAbortError(transferResult.SocketError.SocketErrorCode))
                        {
                            shutdownReason = transferResult.SocketError;
 
                            break;
                        }
 
                        unexpectedError = shutdownReason = transferResult.SocketError;
                    }
 
                    // We don't return to the pool if there was an exception, and
                    // we keep the _sender assigned so that we can dispose it in StartAsync.
                    _socketSenderPool.Return(_sender);
                    _sender = null;
                }
 
                Output.AdvanceTo(buffer.End);
 
                if (result.IsCompleted)
                {
                    break;
                }
            }
        }
        catch (ObjectDisposedException ex)
        {
            // This should always be ignored since Shutdown() must have already been called by Abort().
            shutdownReason = ex;
        }
        catch (Exception ex)
        {
            shutdownReason = ex;
            unexpectedError = ex;
            SocketsLog.ConnectionError(_logger, this, unexpectedError);
        }
        finally
        {
            Shutdown(shutdownReason);
 
            // Complete the output after disposing the socket
            Output.Complete(unexpectedError);
 
            // Cancel any pending flushes so that the input loop is un-paused
            Input.CancelPendingFlush();
        }
    }
 
    private void FireConnectionClosed()
    {
        // Guard against scheduling this multiple times
        if (_connectionClosed)
        {
            return;
        }
 
        _connectionClosed = true;
 
        ThreadPool.UnsafeQueueUserWorkItem(state =>
        {
            state.CancelConnectionClosedToken();
 
            state._waitForConnectionClosedTcs.TrySetResult();
        },
        this,
        preferLocal: false);
    }
 
    private void Shutdown(Exception? shutdownReason)
    {
        lock (_shutdownLock)
        {
            if (_shutdownReason is not null)
            {
                return;
            }
 
            // Make sure to dispose the socket after the volatile _shutdownReason is set.
            // Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when
            // a BadHttpRequestException is thrown instead of a TaskCanceledException.
            //
            // The shutdownReason argument should only be null if the output was completed gracefully, so no one should ever
            // ever observe this ConnectionAbortedException except for connection middleware attempting
            // to half close the connection which is currently unsupported. The message is always logged though.
            _shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully.");
 
            // NB: not _shutdownReason since we don't want to do this on graceful completion
            if (!_finOnError && shutdownReason is not null)
            {
                SocketsLog.ConnectionWriteRst(_logger, this, shutdownReason.Message);
 
                // This forces an abortive close with linger time 0 (and implies Dispose)
                _socket.Close(timeout: 0);
                return;
            }
 
            SocketsLog.ConnectionWriteFin(_logger, this, _shutdownReason.Message);
 
            try
            {
                _socket.Shutdown(SocketShutdown.Both);
            }
            catch
            {
                // Ignore any errors from Socket.Shutdown() since we're tearing down the connection anyway.
            }
 
            _socket.Dispose();
        }
    }
 
    private void CancelConnectionClosedToken()
    {
        try
        {
            _connectionClosedTokenSource.Cancel();
        }
        catch (Exception ex)
        {
            _logger.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(CancelConnectionClosedToken)}.");
        }
    }
 
    private static bool IsConnectionResetError(SocketError errorCode)
    {
        return errorCode == SocketError.ConnectionReset ||
               errorCode == SocketError.Shutdown ||
               (errorCode == SocketError.ConnectionAborted && OperatingSystem.IsWindows());
    }
 
    private static bool IsConnectionAbortError(SocketError errorCode)
    {
        // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
        return errorCode == SocketError.OperationAborted ||
               errorCode == SocketError.Interrupted ||
               (errorCode == SocketError.InvalidArgument && !OperatingSystem.IsWindows());
    }
}