File: CommandTests\Test\NamedPipeClient.cs
Web Access
Project: ..\..\..\test\dotnet.Tests\dotnet.Tests.csproj (dotnet.Tests)
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 
namespace dotnet.Tests.CommandTests.Test;
 
using System.Buffers;
using System.IO.Pipes;
using Microsoft.DotNet.Cli.Commands.Test.IPC;
 
internal sealed class NamedPipeClient : NamedPipeBase
{
    private readonly NamedPipeClientStream _namedPipeClientStream;
    private readonly SemaphoreSlim _lock = new(1, 1);
 
    private readonly MemoryStream _serializationBuffer = new();
    private readonly MemoryStream _messageBuffer = new();
    private readonly byte[] _readBuffer = new byte[250000];
 
    private bool _disposed;
 
    public NamedPipeClient(string name)
    {
        _namedPipeClientStream = new(".", name, PipeDirection.InOut, PipeOptions.CurrentUserOnly);
        PipeName = name;
    }
 
    public string PipeName { get; }
 
    public async Task ConnectAsync(CancellationToken cancellationToken)
        => await _namedPipeClientStream.ConnectAsync(cancellationToken).ConfigureAwait(false);
 
    public async Task<TResponse> RequestReplyAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken)
       where TRequest : IRequest
       where TResponse : IResponse
    {
        await _lock.WaitAsync(cancellationToken).ConfigureAwait(false);
        try
        {
            INamedPipeSerializer requestNamedPipeSerializer = GetSerializer(typeof(TRequest));
 
            // Ask to serialize the body
            _serializationBuffer.Position = 0;
            requestNamedPipeSerializer.Serialize(request, _serializationBuffer);
 
            // Write the message size
            _messageBuffer.Position = 0;
 
            // The length of the message is the size of the message plus one byte to store the serializer id
            // Space for the message
            int sizeOfTheWholeMessage = (int)_serializationBuffer.Position;
 
            // Space for the serializer id
            sizeOfTheWholeMessage += sizeof(int);
 
            // Write the message size
            byte[] bytes = ArrayPool<byte>.Shared.Rent(sizeof(int));
            try
            {
                BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage);
                await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken).ConfigureAwait(false);
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(bytes);
            }
 
            // Write the serializer id
            bytes = ArrayPool<byte>.Shared.Rent(sizeof(int));
            try
            {
                BitConverter.TryWriteBytes(bytes, requestNamedPipeSerializer.Id);
                await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken).ConfigureAwait(false);
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(bytes);
            }
 
            try
            {
                // Write the message
                await _messageBuffer.WriteAsync(_serializationBuffer.GetBuffer().AsMemory(0, (int)_serializationBuffer.Position), cancellationToken).ConfigureAwait(false);
            }
            finally
            {
                // Reset the serialization buffer
                _serializationBuffer.Position = 0;
            }
 
            // Send the message
            try
            {
                await _namedPipeClientStream.WriteAsync(_messageBuffer.GetBuffer().AsMemory(0, (int)_messageBuffer.Position), cancellationToken).ConfigureAwait(false);
                await _namedPipeClientStream.FlushAsync(cancellationToken).ConfigureAwait(false);
                if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
                {
                    _namedPipeClientStream.WaitForPipeDrain();
                }
            }
            finally
            {
                // Reset the buffers
                _messageBuffer.Position = 0;
                _serializationBuffer.Position = 0;
            }
 
            // Read the response
            int currentMessageSize = 0;
            int missingBytesToReadOfWholeMessage = 0;
            while (true)
            {
                int currentReadIndex = 0;
                int currentReadBytes = await _namedPipeClientStream.ReadAsync(_readBuffer.AsMemory(currentReadIndex, _readBuffer.Length), cancellationToken).ConfigureAwait(false);
 
                if (currentReadBytes == 0)
                {
                    // We are reading a message response.
                    // If we cannot get a response, there is no way we can recover and continue executing.
                    // This can happen if the other processes gets killed or crashes while while it's sending the response.
                    // This is especially important for 'dotnet test', where the user can simply kill the dotnet.exe process themselves.
                    // In that case, we want the MTP process to also die.
                    Environment.FailFast("[NamedPipeClient] Connection lost with the other side.");
                }
 
                // Reset the current chunk size
                int missingBytesToReadOfCurrentChunk = currentReadBytes;
 
                // If currentRequestSize is 0, we need to read the message size
                if (currentMessageSize == 0)
                {
                    // We need to read the message size, first 4 bytes
                    currentMessageSize = BitConverter.ToInt32(_readBuffer, 0);
                    missingBytesToReadOfCurrentChunk = currentReadBytes - sizeof(int);
                    missingBytesToReadOfWholeMessage = currentMessageSize;
                    currentReadIndex = sizeof(int);
                }
 
                if (missingBytesToReadOfCurrentChunk > 0)
                {
                    // We need to read the rest of the message
                    await _messageBuffer.WriteAsync(_readBuffer.AsMemory(currentReadIndex, missingBytesToReadOfCurrentChunk), cancellationToken).ConfigureAwait(false);
                    missingBytesToReadOfWholeMessage -= missingBytesToReadOfCurrentChunk;
                }
 
                // If we have read all the message, we can deserialize it
                if (missingBytesToReadOfWholeMessage == 0)
                {
                    // Deserialize the message
                    _messageBuffer.Position = 0;
 
                    // Get the serializer id
                    int serializerId = BitConverter.ToInt32(_messageBuffer.GetBuffer(), 0);
 
                    // Get the serializer
                    _messageBuffer.Position += sizeof(int); // Skip the serializer id
                    INamedPipeSerializer responseNamedPipeSerializer = GetSerializer(serializerId);
 
                    // Deserialize the message
                    try
                    {
                        return (TResponse)responseNamedPipeSerializer.Deserialize(_messageBuffer);
                    }
                    finally
                    {
                        // Reset the message buffer
                        _messageBuffer.Position = 0;
                    }
                }
            }
        }
        finally
        {
            _lock.Release();
        }
    }
 
    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }
 
        _lock.Dispose();
        _serializationBuffer.Dispose();
        _messageBuffer.Dispose();
        _namedPipeClientStream.Dispose();
        _disposed = true;
    }
}