File: NodePipeBase.cs
Web Access
Project: ..\..\..\src\Tasks\Microsoft.Build.Tasks.csproj (Microsoft.Build.Tasks.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using Microsoft.Build.BackEnd;
using Microsoft.Build.Framework;
 
#if !TASKHOST
using System.Buffers.Binary;
using System.Threading.Tasks;
using Microsoft.Build.Eventing;
#endif
 
namespace Microsoft.Build.Internal
{
    internal abstract class NodePipeBase : IDisposable
    {
        /// <summary>
        /// A packet header consists of 1 byte (enum) for the packet type + 4 bytes (int32) for the packet length.
        /// </summary>
        private const int HeaderLength = 5;
 
        /// <summary>
        /// The size of the intermediate in-memory buffers.
        /// </summary>
        private const int InitialBufferSize = 131_072;
 
        /// <summary>
        /// The maximum number of bytes to write in a single operation.
        /// </summary>
        private const int MaxPacketWriteSize = 104_8576;
 
        /// <summary>
        /// A reusable buffer for reading the packet header.
        /// </summary>
        private readonly byte[] _headerData = new byte[HeaderLength];
 
        /// <summary>
        /// A buffer typically big enough to handle a packet body.
        /// We use this as a convenient way to manage and cache a byte[] that's resized
        /// automatically to fit our payload.
        /// </summary>
        private readonly MemoryStream _readBuffer = new(InitialBufferSize);
 
        /// <summary>
        /// A buffer typically big enough to handle a packet body.
        /// We use this as a convenient way to manage and cache a byte[] that's resized
        /// automatically to fit our payload.
        /// </summary>
        private readonly MemoryStream _writeBuffer = new(InitialBufferSize);
 
        private readonly ITranslator _readTranslator;
 
        private readonly ITranslator _writeTranslator;
 
        /// <summary>
        /// The packet factory to be used for deserialization, as packet types may have custom factory logic.
        /// </summary>
        private INodePacketFactory? _packetFactory;
 
        protected NodePipeBase(string pipeName, Handshake handshake)
        {
            PipeName = pipeName;
            HandshakeComponents = handshake.RetrieveHandshakeComponents();
            _readTranslator = BinaryTranslator.GetReadTranslator(_readBuffer, InterningBinaryReader.CreateSharedBuffer());
            _writeTranslator = BinaryTranslator.GetWriteTranslator(_writeBuffer);
        }
 
        /// <summary>
        /// Gets a value indicating whether the pipe is in the connected state. Note that this is not real-time and
        /// will only be updated when an operation on the pipe fails.
        /// When a pipe is broken, Disconnect() must be called for the pipe to be reused - otherwise any attempts to
        /// connect to a new client will throw.
        /// </summary>
        internal bool IsConnected => NodeStream.IsConnected;
 
        protected abstract PipeStream NodeStream { get; }
 
        protected string PipeName { get; }
 
        protected int[] HandshakeComponents { get; }
 
        public void Dispose()
        {
            _readBuffer.Dispose();
            _writeBuffer.Dispose();
            _readTranslator.Dispose();
            _writeTranslator.Dispose();
            NodeStream.Dispose();
        }
 
        internal void RegisterPacketFactory(INodePacketFactory packetFactory) => _packetFactory = packetFactory;
 
        internal void WritePacket(INodePacket packet)
        {
            int messageLength = WritePacketToBuffer(packet);
            byte[] buffer = _writeBuffer.GetBuffer();
 
            for (int i = 0; i < messageLength; i += MaxPacketWriteSize)
            {
                int lengthToWrite = Math.Min(messageLength - i, MaxPacketWriteSize);
                NodeStream.Write(buffer, i, lengthToWrite);
            }
        }
 
        internal INodePacket ReadPacket()
        {
            // Read the header.
            int headerBytesRead = Read(_headerData, HeaderLength);
 
            // When an active connection is broken, any pending read will return 0 bytes before the pipe transitions to
            // the broken state. As this is expected behavior, don't throw an exception if no packet is pending, A node
            // may disconnect without waiting on the other end to gracefully cancel, and the caller can decide whether
            // this was intentional.
            if (headerBytesRead == 0)
            {
                return new NodeShutdown(NodeShutdownReason.ConnectionFailed);
            }
            else if (headerBytesRead != HeaderLength)
            {
                throw new IOException($"Incomplete header read.  {headerBytesRead} of {HeaderLength} bytes read.");
            }
 
#if TASKHOST
            int packetLength = BitConverter.ToInt32(_headerData, 1);
#else
            int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerData, 1, 4));
            MSBuildEventSource.Log.PacketReadSize(packetLength);
#endif
 
            // Read the packet. Set the buffer length now to avoid additional resizing during the read.
            _readBuffer.Position = 0;
            _readBuffer.SetLength(packetLength);
            int packetBytesRead = Read(_readBuffer.GetBuffer(), packetLength);
 
            if (packetBytesRead < packetLength)
            {
                throw new IOException($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read.");
            }
 
            return DeserializePacket();
        }
 
#if !TASKHOST
        internal async Task WritePacketAsync(INodePacket packet, CancellationToken cancellationToken = default)
        {
            int messageLength = WritePacketToBuffer(packet);
            byte[] buffer = _writeBuffer.GetBuffer();
 
            for (int i = 0; i < messageLength; i += MaxPacketWriteSize)
            {
                int lengthToWrite = Math.Min(messageLength - i, MaxPacketWriteSize);
#if NET
                await NodeStream.WriteAsync(buffer.AsMemory(i, lengthToWrite), cancellationToken).ConfigureAwait(false);
#else
                await NodeStream.WriteAsync(buffer, i, lengthToWrite, cancellationToken).ConfigureAwait(false);
#endif
            }
        }
 
        internal async Task<INodePacket> ReadPacketAsync(CancellationToken cancellationToken = default)
        {
            // Read the header.
            int headerBytesRead = await ReadAsync(_headerData, HeaderLength, cancellationToken).ConfigureAwait(false);
 
            // When an active connection is broken, any pending read will return 0 bytes before the pipe transitions to
            // the broken state. As this is expected behavior, don't throw an exception if no packet is pending, A node
            // may disconnect without waiting on the other end to gracefully cancel, and the caller can decide whether
            // this was intentional.
            if (headerBytesRead == 0)
            {
                return new NodeShutdown(NodeShutdownReason.ConnectionFailed);
            }
            else if (headerBytesRead != HeaderLength)
            {
                throw new IOException($"Incomplete header read.  {headerBytesRead} of {HeaderLength} bytes read.");
            }
 
            int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerData, 1, 4));
            MSBuildEventSource.Log.PacketReadSize(packetLength);
 
            // Read the packet. Set the buffer length now to avoid additional resizing during the read.
            _readBuffer.Position = 0;
            _readBuffer.SetLength(packetLength);
            int packetBytesRead = await ReadAsync(_readBuffer.GetBuffer(), packetLength, cancellationToken).ConfigureAwait(false);
 
            if (packetBytesRead < packetLength)
            {
                throw new IOException($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read.");
            }
 
            return DeserializePacket();
        }
#endif
 
        private int WritePacketToBuffer(INodePacket packet)
        {
            // Clear the buffer but keep the underlying capacity to avoid reallocations.
            _writeBuffer.SetLength(HeaderLength);
            _writeBuffer.Position = HeaderLength;
 
            // Serialize and write the packet to the buffer.
            packet.Translate(_writeTranslator);
 
            // Write the header to the buffer.
            _writeBuffer.Position = 0;
            _writeBuffer.WriteByte((byte)packet.Type);
            int messageLength = (int)_writeBuffer.Length;
            _writeTranslator.Writer.Write(messageLength - HeaderLength);
 
            return messageLength;
        }
 
        private int Read(byte[] buffer, int bytesToRead)
        {
            int totalBytesRead = 0;
            while (totalBytesRead < bytesToRead)
            {
                int bytesRead = NodeStream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead);
 
                // 0 byte read will occur if the pipe disconnects.
                if (bytesRead == 0)
                {
                    break;
                }
 
                totalBytesRead += bytesRead;
            }
 
            return totalBytesRead;
        }
 
#if !TASKHOST
        private async ValueTask<int> ReadAsync(byte[] buffer, int bytesToRead, CancellationToken cancellationToken)
        {
            int totalBytesRead = 0;
            while (totalBytesRead < bytesToRead)
            {
#if NET
                int bytesRead = await NodeStream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead), cancellationToken).ConfigureAwait(false);
#else
                int bytesRead = await NodeStream.ReadAsync(buffer, totalBytesRead, bytesToRead - totalBytesRead, cancellationToken).ConfigureAwait(false);
#endif
 
                // 0 byte read will occur if the pipe disconnects.
                if (bytesRead == 0)
                {
                    break;
                }
 
                totalBytesRead += bytesRead;
            }
 
            return totalBytesRead;
        }
#endif
 
        private INodePacket DeserializePacket()
        {
            if (_packetFactory == null)
            {
                throw new InternalErrorException("No packet factory is registered for deserialization.");
            }
 
            NodePacketType packetType = (NodePacketType)_headerData[0];
            try
            {
                return _packetFactory.DeserializePacket(packetType, _readTranslator);
            }
            catch (Exception e) when (e is not InternalErrorException)
            {
                throw new InternalErrorException($"Exception while deserializing packet {packetType}: {e}");
            }
        }
    }
}