File: Internal\NamedPipeConnectionListener.cs
Web Access
Project: src\src\Servers\Kestrel\Transport.NamedPipes\src\Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.csproj (Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Net;
using System.Threading.Channels;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using NamedPipeOptions = System.IO.Pipes.PipeOptions;
using PipeOptions = System.IO.Pipelines.PipeOptions;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
 
internal sealed class NamedPipeConnectionListener : IConnectionListener
{
    private readonly ILogger _log;
    private readonly NamedPipeEndPoint _endpoint;
    private readonly NamedPipeTransportOptions _options;
    private readonly ObjectPool<NamedPipeServerStream> _namedPipeServerStreamPool;
    private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource();
    private readonly CancellationToken _listeningToken;
    private readonly Channel<ConnectionContext> _acceptedQueue;
    private readonly MemoryPool<byte> _memoryPool;
    private readonly PipeOptions _inputOptions;
    private readonly PipeOptions _outputOptions;
    private readonly NamedPipeServerStreamPoolPolicy _poolPolicy;
    private Task? _completeListeningTask;
    private int _disposed;
 
    public NamedPipeConnectionListener(
        NamedPipeEndPoint endpoint,
        NamedPipeTransportOptions options,
        ILoggerFactory loggerFactory,
        ObjectPoolProvider objectPoolProvider)
    {
        _log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes");
        _endpoint = endpoint;
        _options = options;
        _memoryPool = options.MemoryPoolFactory();
        _listeningToken = _listeningTokenSource.Token;
        // Have to create the pool here (instead of DI) because the pool is specific to an endpoint.
        _poolPolicy = new NamedPipeServerStreamPoolPolicy(endpoint, options);
        _namedPipeServerStreamPool = objectPoolProvider.Create(_poolPolicy);
 
        // The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection.
        // We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream
        // when AcceptAsync is called, so that the server is always the owner of the pipe name.
        _acceptedQueue = Channel.CreateBounded<ConnectionContext>(new BoundedChannelOptions(capacity: 1));
 
        var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
        var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
 
        _inputOptions = new PipeOptions(_memoryPool, PipeScheduler.ThreadPool, PipeScheduler.Inline, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
        _outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
    }
 
    internal void ReturnStream(NamedPipeServerStream stream)
    {
        Debug.Assert(!stream.IsConnected, "Stream should have been successfully disconnected to reach this point.");
 
        // The stream is automatically disposed if there isn't space in the pool.
        _namedPipeServerStreamPool.Return(stream);
    }
 
    public void Start()
    {
        Debug.Assert(_completeListeningTask == null, "Already started");
 
        var listeningTasks = new Task[_options.ListenerQueueCount];
 
        for (var i = 0; i < listeningTasks.Length; i++)
        {
            // Start first stream inline to catch creation errors.
            var initialStream = _namedPipeServerStreamPool.Get();
            _poolPolicy.SetFirstPipeStarted();
 
            listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
        }
 
        _completeListeningTask = Task.Run(async () =>
        {
            try
            {
                await Task.WhenAll(listeningTasks);
                _acceptedQueue.Writer.TryComplete();
            }
            catch (Exception ex)
            {
                _acceptedQueue.Writer.TryComplete(ex);
                NamedPipeLog.ConnectionListenerAborted(_log, ex);
            }
        });
    }
 
    public EndPoint EndPoint => _endpoint;
 
    private async Task StartAsync(NamedPipeServerStream nextStream)
    {
        while (true)
        {
            try
            {
                var stream = nextStream;
 
                await stream.WaitForConnectionAsync(_listeningToken);
 
                var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
                connection.Start();
 
                // Create the next stream before writing connected stream to the channel.
                // This ensures there is always a created stream and another process can't
                // create a stream with the same name with different a access policy.
                nextStream = _namedPipeServerStreamPool.Get();
 
                while (!_acceptedQueue.Writer.TryWrite(connection))
                {
                    if (!await _acceptedQueue.Writer.WaitToWriteAsync(_listeningToken))
                    {
                        throw new InvalidOperationException("Accept queue writer was unexpectedly closed.");
                    }
                }
            }
            catch (IOException ex) when (!_listeningToken.IsCancellationRequested)
            {
                // WaitForConnectionAsync can throw IOException when the pipe is broken.
                NamedPipeLog.ConnectionListenerBrokenPipe(_log, ex);
 
                // Dispose existing pipe, create a new one and continue accepting.
                nextStream.Dispose();
                nextStream = _namedPipeServerStreamPool.Get();
            }
            catch (OperationCanceledException) when (_listeningToken.IsCancellationRequested)
            {
                // Token was canceled. The listener is shutting down.
                break;
            }
        }
 
        NamedPipeLog.ConnectionListenerQueueExited(_log);
        nextStream.Dispose();
    }
 
    public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
    {
        while (await _acceptedQueue.Reader.WaitToReadAsync(cancellationToken))
        {
            if (_acceptedQueue.Reader.TryRead(out var connection))
            {
                NamedPipeLog.AcceptedConnection(_log, connection);
                return connection;
            }
        }
 
        return null;
    }
 
    public ValueTask UnbindAsync(CancellationToken cancellationToken = default) => DisposeAsync();
 
    public async ValueTask DisposeAsync()
    {
        // A stream may be waiting on WaitForConnectionAsync when dispose happens.
        // Cancel the token before dispose to ensure StartAsync exits.
        if (Interlocked.Exchange(ref _disposed, 1) == 0)
        {
            _listeningTokenSource.Cancel();
        }
 
        _listeningTokenSource.Dispose();
        if (_completeListeningTask != null)
        {
            await _completeListeningTask;
        }
 
        // Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
        // Important to dispose because this empties and disposes streams in the pool.
        (_namedPipeServerStreamPool as IDisposable)?.Dispose();
    }
 
    private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy<NamedPipeServerStream>
    {
        private readonly NamedPipeEndPoint _endpoint;
        private readonly NamedPipeTransportOptions _options;
        private bool _hasFirstPipeStarted;
 
        public NamedPipeServerStreamPoolPolicy(NamedPipeEndPoint endpoint, NamedPipeTransportOptions options)
        {
            _endpoint = endpoint;
            _options = options;
        }
 
        public NamedPipeServerStream Create()
        {
            var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
            if (!_hasFirstPipeStarted)
            {
                // The first server stream created should validate that no one else is listening with a given name.
                // Only the first server stream should make this test. The listener will almost always create multiple streams
                // to listen on multiple threads and to handle parallel requests. The pool policy must be updated that the
                // setting isn't needed after the first stream.
                pipeOptions |= NamedPipeOptions.FirstPipeInstance;
            }
            if (_options.CurrentUserOnly)
            {
                pipeOptions |= NamedPipeOptions.CurrentUserOnly;
            }
 
            var context = new CreateNamedPipeServerStreamContext
            {
                NamedPipeEndPoint = _endpoint,
                PipeOptions = pipeOptions,
                PipeSecurity = _options.PipeSecurity
            };
            return _options.CreateNamedPipeServerStream(context);
        }
 
        public bool Return(NamedPipeServerStream obj) => !obj.IsConnected;
 
        public void SetFirstPipeStarted()
        {
            _hasFirstPipeStarted = true;
        }
    }
}