File: BackEnd\Client\MSBuildClientPacketPump.cs
Web Access
Project: ..\..\..\src\Build\Microsoft.Build.csproj (Microsoft.Build)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
 
namespace Microsoft.Build.BackEnd.Client
{
    internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketFactory, IDisposable
    {
        /// <summary>
        /// The queue of packets we have received but which have not yet been processed.
        /// </summary>
        public ConcurrentQueue<INodePacket> ReceivedPacketsQueue { get; }
 
        /// <summary>
        /// Set when packet pump receive packets and put them to <see cref="ReceivedPacketsQueue"/>.
        /// </summary>
        public AutoResetEvent PacketReceivedEvent { get; }
 
        /// <summary>
        /// Set when the packet pump terminates.
        /// </summary>
        public ManualResetEvent PacketPumpCompleted { get; }
 
        /// <summary>
        /// Exception appeared when the packet pump unexpectedly terminates (due to connection problems or because of deserialization issues).
        /// </summary>
        public Exception? PacketPumpException { get; set; }
 
        /// <summary>
        /// Set when packet pump should shutdown.
        /// </summary>
        private readonly ManualResetEvent _packetPumpShutdownEvent;
 
        /// <summary>
        /// The packet factory.
        /// </summary>
        private readonly NodePacketFactory _packetFactory;
 
        /// <summary>
        /// The memory stream for a read buffer.
        /// </summary>
        private readonly MemoryStream _readBufferMemoryStream;
 
        /// <summary>
        /// The thread which runs the asynchronous packet pump
        /// </summary>
        private Thread? _packetPumpThread;
 
        /// <summary>
        /// The stream from where to read packets.
        /// </summary>
        private readonly Stream _stream;
 
        /// <summary>
        /// The binary translator for reading packets.
        /// </summary>
        private readonly ITranslator _binaryReadTranslator;
 
        /// <summary>
        /// True if this side is gracefully disconnecting.
        /// In such case we have sent last packet to server side and we expect
        /// it will soon broke pipe connection - unless client do it first.
        /// </summary>
        private bool _isServerDisconnecting;
 
        public MSBuildClientPacketPump(Stream stream)
        {
            ErrorUtilities.VerifyThrowArgumentNull(stream);
 
            _stream = stream;
            _isServerDisconnecting = false;
            _packetFactory = new NodePacketFactory();
 
            ReceivedPacketsQueue = new ConcurrentQueue<INodePacket>();
            PacketReceivedEvent = new AutoResetEvent(false);
            PacketPumpCompleted = new ManualResetEvent(false);
            _packetPumpShutdownEvent = new ManualResetEvent(false);
 
            _readBufferMemoryStream = new MemoryStream();
            _binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer());
        }
 
        #region INodePacketFactory Members
 
        /// <summary>
        /// Registers a packet handler.
        /// </summary>
        /// <param name="packetType">The packet type for which the handler should be registered.</param>
        /// <param name="factory">The factory used to create packets.</param>
        /// <param name="handler">The handler for the packets.</param>
        public void RegisterPacketHandler(NodePacketType packetType, NodePacketFactoryMethod factory, INodePacketHandler handler)
        {
            _packetFactory.RegisterPacketHandler(packetType, factory, handler);
        }
 
        /// <summary>
        /// Unregisters a packet handler.
        /// </summary>
        /// <param name="packetType">The type of packet for which the handler should be unregistered.</param>
        public void UnregisterPacketHandler(NodePacketType packetType)
        {
            _packetFactory.UnregisterPacketHandler(packetType);
        }
 
        /// <summary>
        /// Deserializes and routes a packer to the appropriate handler.
        /// </summary>
        /// <param name="nodeId">The node from which the packet was received.</param>
        /// <param name="packetType">The packet type.</param>
        /// <param name="translator">The translator to use as a source for packet data.</param>
        public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
        {
            _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
        }
 
        /// <summary>
        /// Routes a packet to the appropriate handler.
        /// </summary>
        /// <param name="nodeId">The node id from which the packet was received.</param>
        /// <param name="packet">The packet to route.</param>
        public void RoutePacket(int nodeId, INodePacket packet)
        {
            _packetFactory.RoutePacket(nodeId, packet);
        }
 
        #endregion
 
        #region INodePacketHandler Members
 
        /// <summary>
        /// Called when a packet has been received.
        /// </summary>
        /// <param name="node">The node from which the packet was received.</param>
        /// <param name="packet">The packet.</param>
        public void PacketReceived(int node, INodePacket packet)
        {
            ReceivedPacketsQueue.Enqueue(packet);
            PacketReceivedEvent.Set();
        }
 
        #endregion
 
        #region Packet Pump
        /// <summary>
        /// Initializes the packet pump thread.
        /// </summary>
        public void Start()
        {
            _packetPumpThread = new Thread(PacketPumpProc)
            {
                IsBackground = true,
                Name = "MSBuild Client Packet Pump"
            };
            _packetPumpThread.Start();
        }
 
        /// <summary>
        /// Stops the packet pump thread.
        /// </summary>
        public void Stop()
        {
            _packetPumpShutdownEvent.Set();
            _packetPumpThread?.Join();
        }
 
        /// <summary>
        /// This method handles the packet pump reading. It will terminate when the terminate event is
        /// set.
        /// </summary>
        /// <remarks>
        /// Instead of throwing an exception, puts it in <see cref="PacketPumpException"/> and raises event <see cref="PacketPumpCompleted"/>.
        /// </remarks>
        private void PacketPumpProc()
        {
            RunReadLoop(_stream, _packetPumpShutdownEvent);
        }
 
        private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent)
        {
            CommunicationsUtilities.Trace("Entering read loop.");
 
            try
            {
                byte[] headerByte = new byte[5];
#if FEATURE_APM
                IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
                Task<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length);
#endif
 
                bool continueReading = true;
                do
                {
                    // Ordering of the wait handles is important. The first signalled wait handle in the array
                    // will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
                    // terminate event triggered so that we cannot get into a situation where packets are being
                    // spammed to the client and it never gets an opportunity to shutdown.
                    WaitHandle[] handles =
                    [
                        localPacketPumpShutdownEvent,
#if FEATURE_APM
                        result.AsyncWaitHandle
#else
                        ((IAsyncResult)readTask).AsyncWaitHandle
#endif
                    ];
                    int waitId = WaitHandle.WaitAny(handles);
                    switch (waitId)
                    {
                        case 0:
                            // Fulfill the request for shutdown of the message pump.
                            CommunicationsUtilities.Trace("Shutdown message pump thread.");
                            continueReading = false;
                            break;
 
                        case 1:
                            {
                                // Client recieved a packet header. Read the rest of it.
                                int headerBytesRead = 0;
#if FEATURE_APM
                                headerBytesRead = localStream.EndRead(result);
#else
                                headerBytesRead = readTask.Result;
#endif
 
                                if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
                                {
                                    // Incomplete read. Abort.
                                    if (headerBytesRead == 0)
                                    {
                                        if (_isServerDisconnecting)
                                        {
                                            continueReading = false;
                                            break;
                                        }
 
                                        ErrorUtilities.ThrowInternalError("Server disconnected abruptly");
                                    }
                                    else
                                    {
                                        ErrorUtilities.ThrowInternalError("Incomplete header read.  {0} of {1} bytes read", headerBytesRead, headerByte.Length);
                                    }
                                }
 
                                NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
 
                                int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(headerByte, 1, 4));
                                int packetBytesRead = 0;
 
                                _readBufferMemoryStream.Position = 0;
                                _readBufferMemoryStream.SetLength(packetLength);
                                byte[] packetData = _readBufferMemoryStream.GetBuffer();
 
                                while (packetBytesRead < packetLength)
                                {
                                    int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead);
                                    if (bytesRead == 0)
                                    {
                                        // Incomplete read.  Abort.
                                        ErrorUtilities.ThrowInternalError("Incomplete packet read. {0} of {1} bytes read", packetBytesRead, packetLength);
                                    }
 
                                    packetBytesRead += bytesRead;
                                }
 
                                try
                                {
                                    _packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator);
                                }
                                catch
                                {
                                    // Error while deserializing or handling packet. Logging additional info.
                                    CommunicationsUtilities.Trace("Packet factory failed to receive package. Exception while deserializing packet {0}.", packetType);
                                    throw;
                                }
 
                                if (packetType == NodePacketType.ServerNodeBuildResult)
                                {
                                    continueReading = false;
                                }
                                else
                                {
                                    // Start reading the next package header.
#if FEATURE_APM
                                    result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
                                    readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length);
#endif
                                }
                            }
                            break;
 
                        default:
                            ErrorUtilities.ThrowInternalError("WaitId {0} out of range.", waitId);
                            break;
                    }
                }
                while (continueReading);
            }
            catch (Exception ex)
            {
                CommunicationsUtilities.Trace("Exception occurred in the packet pump: {0}", ex);
                PacketPumpException = ex;
            }
 
            CommunicationsUtilities.Trace("Ending read loop.");
            PacketPumpCompleted.Set();
        }
        #endregion
 
        public void Dispose() => Stop();
 
        /// <summary>
        /// Signalize that from now on we expect server will break connected named pipe.
        /// </summary>
        public void ServerWillDisconnect()
        {
            _isServerDisconnecting = true;
        }
    }
}