File: BackEnd\Components\Communications\NodeProviderOutOfProcBase.cs
Web Access
Project: ..\..\..\src\Build\Microsoft.Build.csproj (Microsoft.Build)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
#endif
 
#if FEATURE_APM
using Microsoft.Build.Eventing;
#else
using System.Threading;
#endif
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
using Task = System.Threading.Tasks.Task;
using Microsoft.Build.Framework;
using Microsoft.Build.BackEnd.Logging;
 
#nullable disable
 
namespace Microsoft.Build.BackEnd
{
    /// <summary>
    /// Contains the shared pieces of code from NodeProviderOutOfProc
    /// and NodeProviderOutOfProcTaskHost.
    /// </summary>
    internal abstract class NodeProviderOutOfProcBase
    {
        /// <summary>
        /// The maximum number of bytes to write
        /// </summary>
        private const int MaxPacketWriteSize = 1048576;
 
        /// <summary>
        /// The number of times to retry creating an out-of-proc node.
        /// </summary>
        private const int NodeCreationRetries = 10;
 
        /// <summary>
        /// The amount of time to wait for an out-of-proc node to spool up before we give up.
        /// </summary>
        private const int TimeoutForNewNodeCreation = 30000;
 
        /// <summary>
        /// The amount of time to wait for an out-of-proc node to exit.
        /// </summary>
        private const int TimeoutForWaitForExit = 30000;
 
        /// <summary>
        /// The build component host.
        /// </summary>
        private IBuildComponentHost _componentHost;
 
        /// <summary>
        /// Keeps track of the processes we've already checked for nodes so we don't check them again.
        /// We decided to use ConcurrentDictionary of(string, byte) as common implementation of ConcurrentHashSet.
        /// </summary>
        private readonly ConcurrentDictionary<string, byte /*void*/> _processesToIgnore = new();
 
        /// <summary>
        /// Delegate used to tell the node provider that a context has been created.
        /// </summary>
        /// <param name="context">The created node context.</param>
        internal delegate void NodeContextCreatedDelegate(NodeContext context);
 
        /// <summary>
        /// Delegate used to tell the node provider that a context has terminated.
        /// </summary>
        /// <param name="nodeId">The id of the node which terminated.</param>
        internal delegate void NodeContextTerminateDelegate(int nodeId);
 
        /// <summary>
        /// The build component host.
        /// </summary>
        protected IBuildComponentHost ComponentHost
        {
            get { return _componentHost; }
            set { _componentHost = value; }
        }
 
        /// <summary>
        /// Sends data to the specified node.
        /// </summary>
        /// <param name="context">The node to which data shall be sent.</param>
        /// <param name="packet">The packet to send.</param>
        protected void SendData(NodeContext context, INodePacket packet)
        {
            ErrorUtilities.VerifyThrowArgumentNull(packet);
            context.SendData(packet);
        }
 
        /// <summary>
        /// Shuts down all of the connected managed nodes.
        /// </summary>
        /// <param name="contextsToShutDown">List of the contexts to be shut down</param>
        /// <param name="enableReuse">Flag indicating if nodes should prepare for reuse.</param>
        protected void ShutdownConnectedNodes(List<NodeContext> contextsToShutDown, bool enableReuse)
        {
            // Send the build completion message to the nodes, causing them to shutdown or reset.
            _processesToIgnore.Clear();
 
            // We wait for child nodes to exit to avoid them changing the terminal
            // after this process terminates.
            bool waitForExit = !enableReuse &&
                                !Console.IsInputRedirected &&
                                Traits.Instance.EscapeHatches.EnsureStdOutForChildNodesIsPrimaryStdout;
 
            Task[] waitForExitTasks = waitForExit && contextsToShutDown.Count > 0 ? new Task[contextsToShutDown.Count] : null;
            int i = 0;
            var loggingService = _componentHost.LoggingService;
            foreach (NodeContext nodeContext in contextsToShutDown)
            {
                if (nodeContext is null)
                {
                    continue;
                }
                nodeContext.SendData(new NodeBuildComplete(enableReuse));
                if (waitForExit)
                {
                    waitForExitTasks[i++] = nodeContext.WaitForExitAsync(loggingService);
                }
            }
            if (waitForExitTasks != null)
            {
                Task.WaitAll(waitForExitTasks);
            }
        }
 
        /// <summary>
        /// Shuts down all of the managed nodes permanently.
        /// </summary>
        /// <param name="nodeReuse">Whether to reuse the node</param>
        /// <param name="terminateNode">Delegate used to tell the node provider that a context has terminated</param>
        protected void ShutdownAllNodes(bool nodeReuse, NodeContextTerminateDelegate terminateNode)
        {
            // INodePacketFactory
            INodePacketFactory factory = new NodePacketFactory();
 
            List<Process> nodeProcesses = GetPossibleRunningNodes().nodeProcesses.ToList();
 
            // Find proper MSBuildTaskHost executable name
            string msbuildtaskhostExeName = NodeProviderOutOfProcTaskHost.TaskHostNameForClr2TaskHost;
 
            // Search for all instances of msbuildtaskhost process and add them to the process list
            nodeProcesses.AddRange(new List<Process>(Process.GetProcessesByName(Path.GetFileNameWithoutExtension(msbuildtaskhostExeName))));
 
            // For all processes in the list, send signal to terminate if able to connect
            foreach (Process nodeProcess in nodeProcesses)
            {
                // A 2013 comment suggested some nodes take this long to respond, so a smaller timeout would miss nodes.
                int timeout = 30;
 
                // Attempt to connect to the process with the handshake without low priority.
                Stream nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, false));
 
                if (nodeStream == null)
                {
                    // If we couldn't connect attempt to connect to the process with the handshake including low priority.
                    nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, true));
                }
 
                if (nodeStream != null)
                {
                    // If we're able to connect to such a process, send a packet requesting its termination
                    CommunicationsUtilities.Trace("Shutting down node with pid = {0}", nodeProcess.Id);
                    NodeContext nodeContext = new NodeContext(0, nodeProcess, nodeStream, factory, terminateNode);
                    nodeContext.SendData(new NodeBuildComplete(false /* no node reuse */));
                    nodeStream.Dispose();
                }
            }
        }
 
        /// <summary>
        /// Finds or creates a child processes which can act as a node.
        /// </summary>
        protected IList<NodeContext> GetNodes(string msbuildLocation,
            string commandLineArgs,
            int nextNodeId,
            INodePacketFactory factory,
            Handshake hostHandshake,
            NodeContextCreatedDelegate createNode,
            NodeContextTerminateDelegate terminateNode,
            int numberOfNodesToCreate)
        {
#if DEBUG
            if (Execution.BuildManager.WaitForDebugger)
            {
                commandLineArgs += " /wfd";
            }
#endif
 
            if (String.IsNullOrEmpty(msbuildLocation))
            {
                msbuildLocation = _componentHost.BuildParameters.NodeExeLocation;
            }
 
            if (String.IsNullOrEmpty(msbuildLocation))
            {
                string msbuildExeName = Environment.GetEnvironmentVariable("MSBUILD_EXE_NAME");
 
                if (!String.IsNullOrEmpty(msbuildExeName))
                {
                    // we assume that MSBUILD_EXE_NAME is, in fact, just the name.
                    msbuildLocation = Path.Combine(msbuildExeName, ".exe");
                }
            }
 
            // Get all process of possible running node processes for reuse and put them into ConcurrentQueue.
            // Processes from this queue will be concurrently consumed by TryReusePossibleRunningNodes while
            //    trying to connect to them and reuse them. When queue is empty, no process to reuse left
            //    new node process will be started.
            string expectedProcessName = null;
            ConcurrentQueue<Process> possibleRunningNodes = null;
#if FEATURE_NODE_REUSE
            // Try to connect to idle nodes if node reuse is enabled.
            if (_componentHost.BuildParameters.EnableNodeReuse)
            {
                IList<Process> possibleRunningNodesList;
                (expectedProcessName, possibleRunningNodesList) = GetPossibleRunningNodes(msbuildLocation);
                possibleRunningNodes = new ConcurrentQueue<Process>(possibleRunningNodesList);
 
                if (possibleRunningNodesList.Count > 0)
                {
                    CommunicationsUtilities.Trace("Attempting to connect to {1} existing processes '{0}'...", expectedProcessName, possibleRunningNodesList.Count);
                }
            }
#endif
            ConcurrentQueue<NodeContext> nodeContexts = new();
            ConcurrentQueue<Exception> exceptions = new();
            Parallel.For(nextNodeId, nextNodeId + numberOfNodesToCreate, (nodeId) =>
            {
                try
                {
                    if (!TryReuseAnyFromPossibleRunningNodes(nodeId) && !StartNewNode(nodeId))
                    {
                        // We were unable to reuse or launch a node.
                        CommunicationsUtilities.Trace("FAILED TO CONNECT TO A CHILD NODE");
                    }
                }
                catch (Exception ex)
                {
                    // It will be rethrown as aggregate exception
                    exceptions.Enqueue(ex);
                }
            });
            if (!exceptions.IsEmpty)
            {
                ErrorUtilities.ThrowInternalError("Cannot acquire required number of nodes.", new AggregateException(exceptions.ToArray()));
            }
 
            return nodeContexts.ToList();
 
            bool TryReuseAnyFromPossibleRunningNodes(int nodeId)
            {
                while (possibleRunningNodes != null && possibleRunningNodes.TryDequeue(out var nodeToReuse))
                {
                    CommunicationsUtilities.Trace("Trying to connect to existing process {2} with id {1} to establish node {0}...", nodeId, nodeToReuse.Id, nodeToReuse.ProcessName);
                    if (nodeToReuse.Id == Process.GetCurrentProcess().Id)
                    {
                        continue;
                    }
 
                    // Get the full context of this inspection so that we can always skip this process when we have the same taskhost context
                    string nodeLookupKey = GetProcessesToIgnoreKey(hostHandshake, nodeToReuse.Id);
                    if (_processesToIgnore.ContainsKey(nodeLookupKey))
                    {
                        continue;
                    }
 
                    // We don't need to check this again
                    _processesToIgnore.TryAdd(nodeLookupKey, default);
 
                    // Attempt to connect to each process in turn.
                    Stream nodeStream = TryConnectToProcess(nodeToReuse.Id, 0 /* poll, don't wait for connections */, hostHandshake);
                    if (nodeStream != null)
                    {
                        // Connection successful, use this node.
                        CommunicationsUtilities.Trace("Successfully connected to existed node {0} which is PID {1}", nodeId, nodeToReuse.Id);
                        string msg = ResourceUtilities.FormatResourceStringIgnoreCodeAndKeyword("NodeReused", nodeId, nodeToReuse.Id);
                        _componentHost.LoggingService.LogBuildEvent(new BuildMessageEventArgs(msg, null, null, MessageImportance.Low)
                        {
                            BuildEventContext = new BuildEventContext(nodeId, BuildEventContext.InvalidTargetId, BuildEventContext.InvalidProjectContextId, BuildEventContext.InvalidTaskId)
                        });
 
                        CreateNodeContext(nodeId, nodeToReuse, nodeStream);
                        return true;
                    }
                }
 
                return false;
            }
 
            // Create a new node process.
            bool StartNewNode(int nodeId)
            {
                CommunicationsUtilities.Trace("Could not connect to existing process, now creating a process...");
 
                // We try this in a loop because it is possible that there is another MSBuild multiproc
                // host process running somewhere which is also trying to create nodes right now.  It might
                // find our newly created node and connect to it before we get a chance.
                int retries = NodeCreationRetries;
                while (retries-- > 0)
                {
#if FEATURE_NET35_TASKHOST
                    // We will also check to see if .NET 3.5 is installed in the case where we need to launch a CLR2 OOP TaskHost.
                    // Failure to detect this has been known to stall builds when Windows pops up a related dialog.
                    // It's also a waste of time when we attempt several times to launch multiple MSBuildTaskHost.exe (CLR2 TaskHost)
                    // nodes because we should never be able to connect in this case.
                    string taskHostNameForClr2TaskHost = Path.GetFileNameWithoutExtension(NodeProviderOutOfProcTaskHost.TaskHostNameForClr2TaskHost);
                    if (Path.GetFileNameWithoutExtension(msbuildLocation).Equals(taskHostNameForClr2TaskHost, StringComparison.OrdinalIgnoreCase))
                    {
                        if (FrameworkLocationHelper.GetPathToDotNetFrameworkV35(DotNetFrameworkArchitecture.Current) == null)
                        {
                            CommunicationsUtilities.Trace(
                                "Failed to launch node from {0}. The required .NET Framework v3.5 is not installed or enabled. CommandLine: {1}",
                                msbuildLocation,
                                commandLineArgs);
 
                            string nodeFailedToLaunchError = ResourceUtilities.GetResourceString("TaskHostNodeFailedToLaunchErrorCodeNet35NotInstalled");
                            throw new NodeFailedToLaunchException(null, nodeFailedToLaunchError);
                        }
                    }
#endif
                    // Create the node process
                    INodeLauncher nodeLauncher = (INodeLauncher)_componentHost.GetComponent(BuildComponentType.NodeLauncher);
                    Process msbuildProcess = nodeLauncher.Start(msbuildLocation, commandLineArgs, nodeId);
                    _processesToIgnore.TryAdd(GetProcessesToIgnoreKey(hostHandshake, msbuildProcess.Id), default);
 
                    // Note, when running under IMAGEFILEEXECUTIONOPTIONS registry key to debug, the process ID
                    // gotten back from CreateProcess is that of the debugger, which causes this to try to connect
                    // to the debugger process. Instead, use MSBUILDDEBUGONSTART=1
 
                    // Now try to connect to it.
                    Stream nodeStream = TryConnectToProcess(msbuildProcess.Id, TimeoutForNewNodeCreation, hostHandshake);
                    if (nodeStream != null)
                    {
                        // Connection successful, use this node.
                        CommunicationsUtilities.Trace("Successfully connected to created node {0} which is PID {1}", nodeId, msbuildProcess.Id);
 
                        CreateNodeContext(nodeId, msbuildProcess, nodeStream);
                        return true;
                    }
 
                    if (msbuildProcess.HasExited)
                    {
                        if (Traits.Instance.DebugNodeCommunication)
                        {
                            try
                            {
                                CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it has exited with exit code {1}. This can indicate a crash at startup", msbuildProcess.Id, msbuildProcess.ExitCode);
                            }
                            catch (InvalidOperationException)
                            {
                                // This case is common on Windows where we called CreateProcess and the Process object
                                // can't get the exit code.
                                CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it has exited with unknown exit code. This can indicate a crash at startup", msbuildProcess.Id);
                            }
                        }
                    }
                    else
                    {
                        CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it is still running. This can occur when two multiprocess builds run in parallel and the other one 'stole' this node", msbuildProcess.Id);
                    }
                }
 
                return false;
            }
 
            void CreateNodeContext(int nodeId, Process nodeToReuse, Stream nodeStream)
            {
                NodeContext nodeContext = new(nodeId, nodeToReuse, nodeStream, factory, terminateNode);
                nodeContexts.Enqueue(nodeContext);
                createNode(nodeContext);
            }
        }
 
        /// <summary>
        /// Finds processes named after either msbuild or msbuildtaskhost.
        /// </summary>
        /// <param name="msbuildLocation"></param>
        /// <returns>
        /// Item 1 is the name of the process being searched for.
        /// Item 2 is the ConcurrentQueue of ordered processes themselves.
        /// </returns>
        private (string expectedProcessName, IList<Process> nodeProcesses) GetPossibleRunningNodes(string msbuildLocation = null)
        {
            if (String.IsNullOrEmpty(msbuildLocation))
            {
                msbuildLocation = "MSBuild.exe";
            }
 
            var expectedProcessName = Path.GetFileNameWithoutExtension(CurrentHost.GetCurrentHost() ?? msbuildLocation);
 
            var processes = Process.GetProcessesByName(expectedProcessName);
            Array.Sort(processes, (left, right) => left.Id.CompareTo(right.Id));
 
            return (expectedProcessName, processes);
        }
 
        /// <summary>
        /// Generate a string from task host context and the remote process to be used as key to lookup processes we have already
        /// attempted to connect to or are already connected to
        /// </summary>
        private string GetProcessesToIgnoreKey(Handshake hostHandshake, int nodeProcessId)
        {
            return hostHandshake.ToString() + "|" + nodeProcessId.ToString(CultureInfo.InvariantCulture);
        }
 
#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY
        // This code needs to be in a separate method so that we don't try (and fail) to load the Windows-only APIs when JIT-ing the code
        //  on non-Windows operating systems
        private static void ValidateRemotePipeSecurityOnWindows(NamedPipeClientStream nodeStream)
        {
            SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner;
#if FEATURE_PIPE_SECURITY
            PipeSecurity remoteSecurity = nodeStream.GetAccessControl();
#else
            var remoteSecurity = new PipeSecurity(nodeStream.SafePipeHandle, System.Security.AccessControl.AccessControlSections.Access |
                System.Security.AccessControl.AccessControlSections.Owner | System.Security.AccessControl.AccessControlSections.Group);
#endif
            IdentityReference remoteOwner = remoteSecurity.GetOwner(typeof(SecurityIdentifier));
            if (remoteOwner != identifier)
            {
                CommunicationsUtilities.Trace("The remote pipe owner {0} does not match {1}", remoteOwner.Value, identifier.Value);
                throw new UnauthorizedAccessException();
            }
        }
#endif
 
        /// <summary>
        /// Attempts to connect to the specified process.
        /// </summary>
        private Stream TryConnectToProcess(int nodeProcessId, int timeout, Handshake handshake)
        {
            // Try and connect to the process.
            string pipeName = NamedPipeUtil.GetPlatformSpecificPipeName(nodeProcessId);
 
#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter
            NamedPipeClientStream nodeStream = new NamedPipeClientStream(
                serverName: ".",
                pipeName,
                PipeDirection.InOut,
                PipeOptions.Asynchronous
#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY
                | PipeOptions.CurrentUserOnly
#endif
            );
#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter
            CommunicationsUtilities.Trace("Attempting connect to PID {0} with pipe {1} with timeout {2} ms", nodeProcessId, pipeName, timeout);
 
            try
            {
                ConnectToPipeStream(nodeStream, pipeName, handshake, timeout);
                return nodeStream;
            }
            catch (Exception e) when (!ExceptionHandling.IsCriticalException(e))
            {
                // Can be:
                // UnauthorizedAccessException -- Couldn't connect, might not be a node.
                // IOException -- Couldn't connect, already in use.
                // TimeoutException -- Couldn't connect, might not be a node.
                // InvalidOperationException – Couldn’t connect, probably a different build
                CommunicationsUtilities.Trace("Failed to connect to pipe {0}. {1}", pipeName, e.Message.TrimEnd());
 
                // If we don't close any stream, we might hang up the child
                nodeStream?.Dispose();
            }
 
            return null;
        }
 
        /// <summary>
        /// Connect to named pipe stream and ensure validate handshake and security.
        /// </summary>
        /// <remarks>
        /// Reused by MSBuild server client <see cref="Microsoft.Build.Experimental.MSBuildClient"/>.
        /// </remarks>
        internal static void ConnectToPipeStream(NamedPipeClientStream nodeStream, string pipeName, Handshake handshake, int timeout)
        {
            nodeStream.Connect(timeout);
 
#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY
            if (NativeMethodsShared.IsWindows)
            {
                // Verify that the owner of the pipe is us.  This prevents a security hole where a remote node has
                // been faked up with ACLs that would let us attach to it.  It could then issue fake build requests back to
                // us, potentially causing us to execute builds that do harmful or unexpected things.  The pipe owner can
                // only be set to the user's own SID by a normal, unprivileged process.  The conditions where a faked up
                // remote node could set the owner to something else would also let it change owners on other objects, so
                // this would be a security flaw upstream of us.
                ValidateRemotePipeSecurityOnWindows(nodeStream);
            }
#endif
 
            int[] handshakeComponents = handshake.RetrieveHandshakeComponents();
            for (int i = 0; i < handshakeComponents.Length; i++)
            {
                CommunicationsUtilities.Trace("Writing handshake part {0} ({1}) to pipe {2}", i, handshakeComponents[i], pipeName);
                nodeStream.WriteIntForHandshake(handshakeComponents[i]);
            }
 
            // This indicates that we have finished all the parts of our handshake; hopefully the endpoint has as well.
            nodeStream.WriteEndOfHandshakeSignal();
 
            CommunicationsUtilities.Trace("Reading handshake from pipe {0}", pipeName);
 
#if NETCOREAPP2_1_OR_GREATER
            nodeStream.ReadEndOfHandshakeSignal(true, timeout);
#else
            nodeStream.ReadEndOfHandshakeSignal(true);
#endif
            // We got a connection.
            CommunicationsUtilities.Trace("Successfully connected to pipe {0}...!", pipeName);
        }
 
        /// <summary>
        /// Class which wraps up the communications infrastructure for a given node.
        /// </summary>
        internal class NodeContext
        {
            private enum ExitPacketState
            {
                None,
                ExitPacketQueued,
                ExitPacketSent
            }
 
            // The pipe(s) used to communicate with the node.
            private Stream _clientToServerStream;
            private Stream _serverToClientStream;
 
            /// <summary>
            /// The factory used to create packets from data read off the pipe.
            /// </summary>
            private INodePacketFactory _packetFactory;
 
            /// <summary>
            /// The node id assigned by the node provider.
            /// </summary>
            private int _nodeId;
 
            /// <summary>
            /// The node process.
            /// </summary>
            private readonly Process _process;
 
            internal Process Process { get { return _process; } }
 
            /// <summary>
            /// An array used to store the header byte for each packet when read.
            /// </summary>
            private byte[] _headerByte;
 
            /// <summary>
            /// A buffer typically big enough to handle a packet body.
            /// We use this as a convenient way to manage and cache a byte[] that's resized
            /// automatically to fit our payload.
            /// </summary>
            private MemoryStream _readBufferMemoryStream;
 
            /// <summary>
            /// A reusable buffer for writing packets.
            /// </summary>
            private MemoryStream _writeBufferMemoryStream;
 
            /// <summary>
            /// A queue used for enqueuing packets to write to the stream asynchronously.
            /// </summary>
            private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();
 
            /// <summary>
            /// A task representing the last packet write, so we can chain packet writes one after another.
            /// We want to queue up writing packets on a separate thread asynchronously, but serially.
            /// Each task drains the <see cref="_packetWriteQueue"/>
            /// </summary>
            private Task _packetWriteDrainTask = Task.CompletedTask;
 
            /// <summary>
            /// Delegate called when the context terminates.
            /// </summary>
            private NodeContextTerminateDelegate _terminateDelegate;
 
            /// <summary>
            /// Tracks the state of the packet sent to terminate the node.
            /// </summary>
            private ExitPacketState _exitPacketState;
 
            /// <summary>
            /// Per node read buffers
            /// </summary>
            private BinaryReaderFactory _binaryReaderFactory;
 
            /// <summary>
            /// Constructor.
            /// </summary>
            public NodeContext(int nodeId, Process process,
                Stream nodePipe,
                INodePacketFactory factory, NodeContextTerminateDelegate terminateDelegate)
            {
                _nodeId = nodeId;
                _process = process;
                _clientToServerStream = nodePipe;
                _serverToClientStream = nodePipe;
                _packetFactory = factory;
                _headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
                _readBufferMemoryStream = new MemoryStream();
                _writeBufferMemoryStream = new MemoryStream();
                _terminateDelegate = terminateDelegate;
                _binaryReaderFactory = InterningBinaryReader.CreateSharedBuffer();
            }
 
            /// <summary>
            /// Id of node.
            /// </summary>
            public int NodeId => _nodeId;
 
            /// <summary>
            /// Starts a new asynchronous read operation for this node.
            /// </summary>
            public void BeginAsyncPacketRead()
            {
#if FEATURE_APM
                _clientToServerStream.BeginRead(_headerByte, 0, _headerByte.Length, HeaderReadComplete, this);
#else
                ThreadPool.QueueUserWorkItem(delegate
                {
                    var ignored = RunPacketReadLoopAsync();
                });
#endif
            }
 
#if !FEATURE_APM
            public async Task RunPacketReadLoopAsync()
            {
                while (true)
                {
                    try
                    {
                        int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, _headerByte, _headerByte.Length);
                        if (!ProcessHeaderBytesRead(bytesRead))
                        {
                            return;
                        }
                    }
                    catch (IOException e)
                    {
                        CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync: {0}", e);
                        _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                        Close();
                        return;
                    }
 
                    NodePacketType packetType = (NodePacketType)_headerByte[0];
                    int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
 
                    _readBufferMemoryStream.SetLength(packetLength);
                    byte[] packetData = _readBufferMemoryStream.GetBuffer();
 
                    try
                    {
                        int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, packetData, packetLength);
                        if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType))
                        {
                            return;
                        }
                    }
                    catch (IOException e)
                    {
                        CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync (Reading): {0}", e);
                        _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                        Close();
                        return;
                    }
 
                    // Read and route the packet.
                    if (!ReadAndRoutePacket(packetType, packetData, packetLength))
                    {
                        return;
                    }
 
                    if (packetType == NodePacketType.NodeShutdown)
                    {
                        Close();
                        return;
                    }
                }
            }
#endif
 
            /// <summary>
            /// Sends the specified packet to this node asynchronously.
            /// The method enqueues a task to write the packet and returns
            /// immediately. This is because SendData() is on a hot path
            /// under the primary lock (BuildManager's _syncLock)
            /// and we want to minimize our time there.
            /// </summary>
            /// <param name="packet">The packet to send.</param>
            public void SendData(INodePacket packet)
            {
                if (IsExitPacket(packet))
                {
                    _exitPacketState = ExitPacketState.ExitPacketQueued;
                }
                _packetWriteQueue.Add(packet);
                DrainPacketQueue();
            }
 
            /// <summary>
            /// Schedule a task to drain the packet write queue. We could have had a
            /// dedicated thread that would pump the queue constantly, but
            /// we don't want to allocate a dedicated thread per node (1MB stack)
            /// </summary>
            /// <remarks>Usually there'll be a single packet in the queue, but sometimes
            /// a burst of SendData comes in, with 10-20 packets scheduled. In this case
            /// the first scheduled task will drain all of them, and subsequent tasks
            /// will run on an empty queue. I tried to write logic that avoids queueing
            /// a new task if the queue is already being drained, but it didn't show any
            /// improvement and made things more complicated.</remarks>
            private void DrainPacketQueue()
            {
                // this lock is only necessary to protect a write to _packetWriteDrainTask field
                lock (_packetWriteQueue)
                {
                    // average latency between the moment this runs and when the delegate starts
                    // running is about 100-200 microseconds (unless there's thread pool saturation)
                    _packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
                    {
                        while (_packetWriteQueue.TryTake(out var packet))
                        {
                            SendDataCore(packet);
                        }
                    }, TaskScheduler.Default);
                }
            }
 
            /// <summary>
            /// Actually writes and sends the packet. This can't be called in parallel
            /// because it reuses the _writeBufferMemoryStream, and this is why we use
            /// the _packetWriteDrainTask to serially chain invocations one after another.
            /// </summary>
            /// <param name="packet">The packet to send.</param>
            private void SendDataCore(INodePacket packet)
            {
                MemoryStream writeStream = _writeBufferMemoryStream;
 
                // clear the buffer but keep the underlying capacity to avoid reallocations
                writeStream.SetLength(0);
 
                ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
                try
                {
                    writeStream.WriteByte((byte)packet.Type);
 
                    // Pad for the packet length
                    WriteInt32(writeStream, 0);
                    packet.Translate(writeTranslator);
 
                    int writeStreamLength = (int)writeStream.Position;
 
                    // Now plug in the real packet length
                    writeStream.Position = 1;
                    WriteInt32(writeStream, writeStreamLength - 5);
 
                    byte[] writeStreamBuffer = writeStream.GetBuffer();
 
                    for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
                    {
                        int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
                        _serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
                    }
                    if (IsExitPacket(packet))
                    {
                        _exitPacketState = ExitPacketState.ExitPacketSent;
                    }
                }
                catch (IOException e)
                {
                    // Do nothing here because any exception will be caught by the async read handler
                    CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
                }
                catch (ObjectDisposedException) // This happens if a child dies unexpectedly
                {
                    // Do nothing here because any exception will be caught by the async read handler
                }
            }
 
            private static bool IsExitPacket(INodePacket packet)
            {
                return packet is NodeBuildComplete buildCompletePacket && !buildCompletePacket.PrepareForReuse;
            }
 
            /// <summary>
            /// Avoid having a BinaryWriter just to write a 4-byte int
            /// </summary>
            private void WriteInt32(MemoryStream stream, int value)
            {
                stream.WriteByte((byte)value);
                stream.WriteByte((byte)(value >> 8));
                stream.WriteByte((byte)(value >> 16));
                stream.WriteByte((byte)(value >> 24));
            }
 
            /// <summary>
            /// Closes the node's context, disconnecting it from the node.
            /// </summary>
            private void Close()
            {
                _clientToServerStream.Dispose();
                if (!object.ReferenceEquals(_clientToServerStream, _serverToClientStream))
                {
                    _serverToClientStream.Dispose();
                }
                _terminateDelegate(_nodeId);
            }
 
            /// <summary>
            /// Waits for the child node process to exit.
            /// </summary>
            public async Task WaitForExitAsync(ILoggingService loggingService)
            {
                if (_exitPacketState == ExitPacketState.ExitPacketQueued)
                {
                    // Wait up to 100ms until all remaining packets are sent.
                    // We don't need to wait long, just long enough for the Task to start running on the ThreadPool.
                    await Task.WhenAny(_packetWriteDrainTask, Task.Delay(100));
                }
                if (_exitPacketState == ExitPacketState.ExitPacketSent)
                {
                    CommunicationsUtilities.Trace("Waiting for node with pid = {0} to exit", _process.Id);
 
                    // .NET 5 introduces a real WaitForExitAsyc.
                    // This is a poor man's implementation that uses polling.
                    int timeout = TimeoutForWaitForExit;
                    int delay = 5;
                    while (timeout > 0)
                    {
                        bool exited = _process.WaitForExit(milliseconds: 0);
                        if (exited)
                        {
                            return;
                        }
                        timeout -= delay;
                        await Task.Delay(delay).ConfigureAwait(false);
 
                        // Double delay up to 500ms.
                        delay = Math.Min(delay * 2, 500);
                    }
                }
 
                // Kill the child and do a blocking wait.
                loggingService?.LogWarning(
                    BuildEventContext.Invalid,
                    null,
                    BuildEventFileInfo.Empty,
                    "KillingProcessWithPid",
                    _process.Id);
                CommunicationsUtilities.Trace("Killing node with pid = {0}", _process.Id);
 
                _process.KillTree(timeoutMilliseconds: 5000);
            }
 
#if FEATURE_APM
            /// <summary>
            /// Completes the asynchronous packet write to the node.
            /// </summary>
            private void PacketWriteComplete(IAsyncResult result)
            {
                try
                {
                    _serverToClientStream.EndWrite(result);
                }
                catch (IOException)
                {
                    // Do nothing here because any exception will be caught by the async read handler
                }
            }
#endif
 
            private bool ProcessHeaderBytesRead(int bytesRead)
            {
                if (bytesRead != _headerByte.Length)
                {
                    CommunicationsUtilities.Trace(_nodeId, "COMMUNICATIONS ERROR (HRC) Node: {0} Process: {1} Bytes Read: {2} Expected: {3}", _nodeId, _process.Id, bytesRead, _headerByte.Length);
                    try
                    {
                        if (_process.HasExited)
                        {
                            CommunicationsUtilities.Trace(_nodeId, "   Child Process {0} has exited.", _process.Id);
                        }
                        else
                        {
                            CommunicationsUtilities.Trace(_nodeId, "   Child Process {0} is still running.", _process.Id);
                        }
                    }
                    catch (Exception e) when (!ExceptionHandling.IsCriticalException(e))
                    {
                        CommunicationsUtilities.Trace(_nodeId, "Unable to retrieve remote process information. {0}", e);
                    }
 
                    _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                    Close();
                    return false;
                }
 
                return true;
            }
 
#if FEATURE_APM
            /// <summary>
            /// Callback invoked by the completion of a read of a header byte on one of the named pipes.
            /// </summary>
            private void HeaderReadComplete(IAsyncResult result)
            {
                int bytesRead;
                try
                {
                    try
                    {
                        bytesRead = _clientToServerStream.EndRead(result);
                    }
 
                    // Workaround for CLR stress bug; it sporadically calls us twice on the same async
                    // result, and EndRead will throw on the second one. Pretend the second one never happened.
                    catch (ArgumentException)
                    {
                        CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
                        return;
                    }
 
                    if (!ProcessHeaderBytesRead(bytesRead))
                    {
                        return;
                    }
                }
                catch (IOException e)
                {
                    CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in HeaderReadComplete: {0}", e);
                    _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                    Close();
                    return;
                }
 
                int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
                MSBuildEventSource.Log.PacketReadSize(packetLength);
 
                // Ensures the buffer is at least this length.
                // It avoids reallocations if the buffer is already large enough.
                _readBufferMemoryStream.SetLength(packetLength);
                byte[] packetData = _readBufferMemoryStream.GetBuffer();
 
                _clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
            }
#endif
 
            private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketType packetType)
            {
                if (bytesRead != packetLength)
                {
                    CommunicationsUtilities.Trace(_nodeId, "Bad packet read for packet {0} - Expected {1} bytes, got {2}", packetType, packetLength, bytesRead);
                    _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                    Close();
                    return false;
                }
                return true;
            }
 
            private bool ReadAndRoutePacket(NodePacketType packetType, byte[] packetData, int packetLength)
            {
                try
                {
                    // The buffer is publicly visible so that InterningBinaryReader doesn't have to copy to an intermediate buffer.
                    // Since the buffer is publicly visible dispose right away to discourage outsiders from holding a reference to it.
                    using (var packetStream = new MemoryStream(packetData, 0, packetLength, /*writeable*/ false, /*bufferIsPubliclyVisible*/ true))
                    {
                        ITranslator readTranslator = BinaryTranslator.GetReadTranslator(packetStream, _binaryReaderFactory);
                        _packetFactory.DeserializeAndRoutePacket(_nodeId, packetType, readTranslator);
                    }
                }
                catch (IOException e)
                {
                    CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in ReadAndRoutPacket: {0}", e);
                    _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                    Close();
                    return false;
                }
                return true;
            }
 
#if FEATURE_APM
            /// <summary>
            /// Method called when the body of a packet has been read.
            /// </summary>
            private void BodyReadComplete(IAsyncResult result)
            {
                NodePacketType packetType = (NodePacketType)_headerByte[0];
                var state = (Tuple<byte[], int>)result.AsyncState;
                byte[] packetData = state.Item1;
                int packetLength = state.Item2;
                int bytesRead;
 
                try
                {
                    try
                    {
                        bytesRead = _clientToServerStream.EndRead(result);
                    }
 
                    // Workaround for CLR stress bug; it sporadically calls us twice on the same async
                    // result, and EndRead will throw on the second one. Pretend the second one never happened.
                    catch (ArgumentException)
                    {
                        CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
                        return;
                    }
 
                    if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType))
                    {
                        return;
                    }
                }
                catch (IOException e)
                {
                    CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in BodyReadComplete (Reading): {0}", e);
                    _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
                    Close();
                    return;
                }
 
                // Read and route the packet.
                if (!ReadAndRoutePacket(packetType, packetData, packetLength))
                {
                    return;
                }
 
                if (packetType != NodePacketType.NodeShutdown)
                {
                    // Read the next packet.
                    BeginAsyncPacketRead();
                }
                else
                {
                    Close();
                }
            }
#endif
        }
    }
}