File: Commands\Test\MTP\IPC\NamedPipeServer.cs
Web Access
Project: ..\..\..\src\Cli\dotnet\dotnet.csproj (dotnet)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO.Pipes;
 
namespace Microsoft.DotNet.Cli.Commands.Test.IPC;
 
internal sealed class NamedPipeServer : NamedPipeBase
{
    private static bool IsUnix => Path.DirectorySeparatorChar == '/';
 
    private readonly Func<NamedPipeServer, IRequest, Task<IResponse>> _callback;
    private readonly NamedPipeServerStream _namedPipeServerStream;
    private readonly CancellationToken _cancellationToken;
    private readonly MemoryStream _serializationBuffer = new();
    private readonly MemoryStream _messageBuffer = new();
    private readonly byte[] _readBuffer = new byte[250000];
    private readonly byte[] _sizeOfIntArray = new byte[sizeof(int)];
    private readonly bool _skipUnknownMessages;
    private Task? _loopTask;
    private bool _disposed;
 
    public NamedPipeServer(
        string pipeName,
        Func<NamedPipeServer, IRequest, Task<IResponse>> callback,
        int maxNumberOfServerInstances,
        CancellationToken cancellationToken,
        bool skipUnknownMessages)
    {
        _namedPipeServerStream = new NamedPipeServerStream(
            pipeName,
            PipeDirection.InOut,
            maxNumberOfServerInstances,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly,
            inBufferSize: 0,
            outBufferSize: 0);
 
        _callback = callback;
        _cancellationToken = cancellationToken;
        _skipUnknownMessages = skipUnknownMessages;
    }
 
    public bool WasConnected { get; private set; }
 
    public async Task WaitConnectionAsync(CancellationToken cancellationToken)
    {
        await _namedPipeServerStream.WaitForConnectionAsync(cancellationToken);
        WasConnected = true;
        _loopTask = Task.Run(
            async () =>
            {
                try
                {
                    await InternalLoopAsync(_cancellationToken);
                }
                catch (OperationCanceledException ex) when (ex.CancellationToken == _cancellationToken)
                {
                    // We are being cancelled, so we don't need to wait anymore
                    return;
                }
            }, cancellationToken);
    }
 
    /// <summary>
    /// 4 bytes = message size
    /// ------- Payload -------
    /// 4 bytes = serializer id
    /// x bytes = object buffer.
    /// </summary>
    private async Task InternalLoopAsync(CancellationToken cancellationToken)
    {
        // This is an indicator when reading from the pipe whether we are at the start of a new message (i.e, we should read 4 bytes as message size)
        // Note that the implementation assumes no overlapping messages in the pipe.
        // The flow goes like:
        // 1. MTP sends a request (and acquires lock).
        // 2. SDK reads the request.
        // 3. SDK sends a response.
        // 4. MTP reads the response (and releases lock).
        // This means that no two requests can be in the pipe at the same time.
        bool isStartOfNewMessage = true;
        int remainingBytesToReadOfWholeMessage = 0;
        while (!cancellationToken.IsCancellationRequested)
        {
            // If we are at the start of a new message, we need to read at least the message size.
            int currentReadBytes = isStartOfNewMessage
                ? await _namedPipeServerStream.ReadAtLeastAsync(_readBuffer, minimumBytes: sizeof(int), throwOnEndOfStream: false, cancellationToken)
                : await _namedPipeServerStream.ReadAsync(_readBuffer, cancellationToken);
 
            if (currentReadBytes == 0 || (isStartOfNewMessage && currentReadBytes < sizeof(int)))
            {
                // The client has disconnected
                return;
            }
 
            // The local remainingBytesToProcess tracks the remaining bytes of what we have read from the pipe but not yet processed.
            // At the beginning here, it contains everything we have read from the pipe.
            // As we are processing the data in it, we continue to slice it.
            Memory<byte> remainingBytesToProcess = _readBuffer.AsMemory(0, currentReadBytes);
 
            // If the current read is the start of a new message, we need to read the message size first.
            if (isStartOfNewMessage)
            {
                // We need to read the message size, first 4 bytes
                remainingBytesToReadOfWholeMessage = BitConverter.ToInt32(remainingBytesToProcess.Span);
 
                // Now that we have read the size, we slice the remainingBytesToProcess.
                remainingBytesToProcess = remainingBytesToProcess.Slice(sizeof(int));
 
                // Now that we have read the size, we are no longer at the start of a new message.
                // If the current chunk ended up to be the full message, we will set this back to true later.
                isStartOfNewMessage = false;
            }
 
            // We read the rest of the message.
            // Note that this assumes that no messages are overlapping in the pipe.
            if (remainingBytesToProcess.Length > 0)
            {
                // We need to read the rest of the message
                await _messageBuffer.WriteAsync(remainingBytesToProcess, cancellationToken);
                remainingBytesToReadOfWholeMessage -= remainingBytesToProcess.Length;
 
                // At this point, we have read everything in the remainingBytesToProcess.
                // Note that while remainingBytesToProcess isn't accessed after this point, we still maintain the
                // invariant that it tracks what we have read from the pipe but not yet processed.
                remainingBytesToProcess = Memory<byte>.Empty;
            }
 
            if (remainingBytesToReadOfWholeMessage < 0)
            {
                throw new UnreachableException(CliCommandStrings.DotnetTestPipeOverlapping);
            }
 
            // If we have read all the message, we can deserialize it
            if (remainingBytesToReadOfWholeMessage == 0)
            {
                // Deserialize the message
                _messageBuffer.Position = 0;
 
                // Get the serializer id
                int serializerId = BitConverter.ToInt32(_messageBuffer.GetBuffer(), 0);
 
                // Get the serializer
                INamedPipeSerializer requestNamedPipeSerializer = GetSerializer(serializerId, _skipUnknownMessages);
 
                // Deserialize the message
                _messageBuffer.Position += sizeof(int); // Skip the serializer id
                var deserializedObject = (IRequest)requestNamedPipeSerializer.Deserialize(_messageBuffer);
 
                // Call the callback
                IResponse response = await _callback(this, deserializedObject);
 
                // Write the message size
                _messageBuffer.Position = 0;
 
                // Get the response serializer
                INamedPipeSerializer responseNamedPipeSerializer = GetSerializer(response.GetType());
 
                // Serialize the response
                responseNamedPipeSerializer.Serialize(response, _serializationBuffer);
 
                // The length of the message is the size of the message plus one byte to store the serializer id
                // Space for the message
                int sizeOfTheWholeMessage = (int)_serializationBuffer.Position;
 
                // Space for the serializer id
                sizeOfTheWholeMessage += sizeof(int);
 
                // Write the message size
                byte[] bytes = _sizeOfIntArray;
                if (!BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage))
                {
                    throw new UnreachableException();
                }
 
                await _messageBuffer.WriteAsync(bytes, cancellationToken);
 
                // Write the serializer id
                bytes = _sizeOfIntArray;
                if (!BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id))
                {
                    throw new UnreachableException();
                }
 
                await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken);
 
                // Write the message
                await _messageBuffer.WriteAsync(_serializationBuffer.GetBuffer().AsMemory(0, (int)_serializationBuffer.Position), cancellationToken);
 
                // Send the message
                try
                {
                    await _namedPipeServerStream.WriteAsync(_messageBuffer.GetBuffer().AsMemory(0, (int)_messageBuffer.Position), cancellationToken);
                    await _namedPipeServerStream.FlushAsync(cancellationToken);
                }
                finally
                {
                    // Reset the buffers
                    _messageBuffer.Position = 0;
                    _serializationBuffer.Position = 0;
                }
 
                // Reset the control variables
                isStartOfNewMessage = true;
                remainingBytesToReadOfWholeMessage = 0;
            }
        }
    }
 
    public static string GetPipeName(string name)
    {
        if (!IsUnix)
        {
            return $"testingplatform.pipe.{name.Replace('\\', '.')}";
        }
 
        // Similar to https://github.com/dotnet/roslyn/blob/99bf83c7bc52fa1ff27cf792db38755d5767c004/src/Compilers/Shared/NamedPipeUtil.cs#L26-L42
        return Path.Combine("/tmp", name);
    }
 
    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }
 
        try
        {
            if (WasConnected)
            {
                // If the loop task is null at this point we have race condition, means that the task didn't start yet and we already dispose.
                // This is unexpected and we throw an exception.
 
                // To close gracefully we need to ensure that the client closed the stream line 103.
                if (!_loopTask!.Wait(TimeSpan.FromSeconds(90)))
                {
                    throw new InvalidOperationException(CliCommandStrings.InternalLoopAsyncDidNotExitSuccessfullyErrorMessage);
                }
            }
        }
        finally
        {
            // Ensure we are still disposing the resouces correctly, even if _loopTask completes with
            // an exception, or if the task doesn't complete within the 90 seconds limit.
            _namedPipeServerStream.Dispose();
            _disposed = true;
        }
    }
}