File: src\Compilers\Server\VBCSCompiler\NamedPipeClientConnectionHost.cs
Web Access
Project: src\src\Compilers\Server\VBCSCompiler\AnyCpu\VBCSCompiler.csproj (VBCSCompiler)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.CommandLine;
using Microsoft.CodeAnalysis.Diagnostics;
using Roslyn.Utilities;
namespace Microsoft.CodeAnalysis.CompilerServer
{
    internal sealed class NamedPipeClientConnectionHost : IClientConnectionHost
    {
        private readonly struct ListenResult
        {
            internal NamedPipeClientConnection? NamedPipeClientConnection { get; }
            internal Exception? Exception { get; }
 
            internal ListenResult(NamedPipeClientConnection? connection = null, Exception? exception = null)
            {
                Debug.Assert(connection is null || exception is null);
                NamedPipeClientConnection = connection;
                Exception = exception;
            }
        }
 
        private CancellationTokenSource? _cancellationTokenSource;
        private Task[]? _listenTasks;
        private AsyncQueue<ListenResult>? _queue;
 
        public string PipeName { get; }
        public ICompilerServerLogger Logger { get; }
        public bool IsListening { get; private set; }
 
        internal NamedPipeClientConnectionHost(string pipeName, ICompilerServerLogger logger)
        {
            PipeName = pipeName;
            Logger = logger;
        }
 
        public void BeginListening()
        {
            if (IsListening)
            {
                throw new InvalidOperationException();
            }
 
            Debug.Assert(_cancellationTokenSource is null);
            Debug.Assert(_listenTasks is null);
            Debug.Assert(_queue is null);
 
            IsListening = true;
            _cancellationTokenSource = new CancellationTokenSource();
            _queue = new AsyncQueue<ListenResult>();
 
            // The choice of 4 here is a bit arbitrary. The compiler server needs to scale to the number of clients that 
            // msbuild is going to attempt to connect here and be able to establish each connection in one second. In the 
            // majority of cases even one is enough to accomplish this. Four though gives us enough wiggle room to handle
            // severe load scenarios.
            // 
            // Should you ever want to change this number in the future make sure to test the new values on sufficiently
            // large builds such as dotnet/roslyn or dotnet/runtime
            var listenCount = Math.Min(4, Environment.ProcessorCount);
            _listenTasks = new Task[listenCount];
            for (int i = 0; i < listenCount; i++)
            {
                var task = Task.Run(() => ListenCoreAsync(PipeName, Logger, _queue, _cancellationTokenSource.Token));
                _listenTasks[i] = task;
            }
        }
 
        public void EndListening()
        {
            if (!IsListening)
            {
                throw new InvalidOperationException();
            }
 
            Debug.Assert(_cancellationTokenSource is object);
            Debug.Assert(_queue is object);
            Debug.Assert(_listenTasks is object);
 
            try
            {
                // Even though the Tasks created to run the compilation servers can never throw, 
                // the CancellationToken from this source ends up getting passed throughout the 
                // named pipe infrastructure. Parts of that infrastructure hook into 
                // CancellationToken.Register and those will throw during a Cancel operation. 
                //
                // Most notably of these is IOCancellationHelper.Cancel. This has a race where it
                // will try to cancel IO on a disposed SafeHandle. That causes an ObjectDisposedException
                // to propagate out from the Cancel method here.
                //
                // There is no good way to guard against this hence we just have to accept it as a
                // possible outcome.
                _cancellationTokenSource.Cancel();
            }
            catch (Exception ex)
            {
                Logger.LogException(ex, $"Cancelling server listens threw an exception");
            }
 
            try
            {
                Task.WaitAll(_listenTasks);
            }
            catch (Exception ex)
            {
                Logger.LogException(ex, $"Listen tasks threw exception during {nameof(EndListening)}");
            }
 
            _queue.Complete();
            _queue.WhenCompletedTask.Wait();
 
            // Anything left in the AsyncQueue after completion will not be handled by the client
            // and must be cleaned up by the host.
            while (_queue.TryDequeue(out var connectionResult))
            {
                connectionResult.NamedPipeClientConnection?.Dispose();
            }
 
            _queue = null;
            _cancellationTokenSource.Dispose();
            _cancellationTokenSource = null;
            _listenTasks = null;
            IsListening = false;
        }
 
        public async Task<IClientConnection> GetNextClientConnectionAsync()
        {
            if (!IsListening)
            {
                throw new InvalidOperationException();
            }
 
            Debug.Assert(_cancellationTokenSource is object);
            Debug.Assert(_queue is object);
 
            var listenResult = await _queue.DequeueAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
            if (listenResult.Exception is object)
            {
                throw new Exception("Error occurred listening for connections", listenResult.Exception);
            }
 
            if (listenResult.NamedPipeClientConnection is null)
            {
                // The AsyncQueue<> implementation will resolve all out-standing waiters as default 
                // when Complete is called. Treat that as cancellation from the perspective of our
                // callers
                throw new OperationCanceledException();
            }
 
            return listenResult.NamedPipeClientConnection;
        }
 
        /// <summary>
        /// Creates a Task that waits for a client connection to occur and returns the connected 
        /// <see cref="NamedPipeServerStream"/> object.  Throws on any connection error.
        /// </summary>
        /// <param name="cancellationToken">Used to cancel the connection sequence.</param>
        private static async Task ListenCoreAsync(
            string pipeName,
            ICompilerServerLogger logger,
            AsyncQueue<ListenResult> queue,
            CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                NamedPipeServerStream? pipeStream = null;
 
                try
                {
                    // Create the pipe and begin waiting for a connection. This 
                    // doesn't block, but could fail in certain circumstances, such
                    // as Windows refusing to create the pipe for some reason 
                    // (out of handles?), or the pipe was disconnected before we 
                    // starting listening
                    logger.Log($"Constructing pipe and waiting for connections '{pipeName}'");
                    pipeStream = NamedPipeUtil.CreateServer(pipeName);
 
                    // The WaitForConnectionAsync API does not fully respect the provided CancellationToken
                    // on all platforms:
                    //
                    //  https://github.com/dotnet/runtime/issues/40289
                    //
                    // To mitigate this we need to setup a cancellation Task and dispose the NamedPipeServerStream
                    // if it ever completes. Once all of the NamedPipeServerStream for the given pipe name are
                    // disposed they will all exit the WaitForConnectionAsync method
                    var connectTask = pipeStream.WaitForConnectionAsync(cancellationToken);
                    if (!PlatformInformation.IsWindows)
                    {
                        var cancelTask = Task.Delay(TimeSpan.FromMilliseconds(-1), cancellationToken);
                        var completedTask = await Task.WhenAny(new[] { connectTask, cancelTask }).ConfigureAwait(false);
                        if (completedTask == cancelTask)
                        {
                            throw new OperationCanceledException();
                        }
                    }
 
                    await connectTask.ConfigureAwait(false);
                    logger.Log("Pipe connection established.");
                    var connection = new NamedPipeClientConnection(pipeStream, logger);
                    queue.Enqueue(new ListenResult(connection: connection));
                }
                catch (OperationCanceledException)
                {
                    // Expected when the host is shutting down.
                    logger.Log($"Pipe connection cancelled");
                    pipeStream?.Dispose();
                }
                catch (Exception ex)
                {
                    logger.LogException(ex, $"Pipe connection error");
                    queue.Enqueue(new ListenResult(exception: ex));
                    pipeStream?.Dispose();
                }
            }
        }
    }
}