File: BackEnd\NodeEndpointOutOfProcTaskHost.cs
Web Access
Project: ..\..\..\src\MSBuildTaskHost\MSBuildTaskHost.csproj (MSBuildTaskHost)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipes;
using System.Security.AccessControl;
using System.Security.Principal;
using System.Threading;
using Microsoft.Build.TaskHost.Collections;
using Microsoft.Build.TaskHost.Utilities;
 
namespace Microsoft.Build.TaskHost.BackEnd;
 
/// <summary>
/// This is an implementation of INodeEndpoint for the out-of-proc nodes.  It acts only as a client.
/// </summary>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope", Justification = "It is expected to keep the stream open for the process lifetime")]
internal sealed class NodeEndpointOutOfProcTaskHost : INodeEndpoint
{
    /// <summary>
    /// The size of the buffers to use for named pipes.
    /// </summary>
    private const int PipeBufferSize = 131072;
 
    /// <summary>
    /// The pipe client used by the nodes.
    /// </summary>
    private readonly NamedPipeServerStream _pipeServer;
 
    /// <summary>
    /// Per-node shared read buffer.
    /// </summary>
    private readonly BinaryReaderFactory _sharedReadBuffer;
 
    /// <summary>
    /// A way to cache a byte array when writing out packets.
    /// </summary>
    private readonly MemoryStream _packetStream;
 
    /// <summary>
    /// A binary writer to help write into <see cref="_packetStream"/>.
    /// </summary>
    private readonly BinaryWriter _binaryWriter;
 
    /// <summary>
    /// Represents the version of the parent packet associated with the node instantiation.
    /// </summary>
    private readonly byte _parentPacketVersion;
 
    /// <summary>
    /// The current communication status of the node.
    /// </summary>
    private LinkStatus _status;
 
    /// <summary>
    /// Set when a packet is available in the packet queue.
    /// </summary>
    private AutoResetEvent? _packetAvailable;
 
    /// <summary>
    /// Set when the asynchronous packet pump should terminate.
    /// </summary>
    private AutoResetEvent? _terminatePacketPump;
 
    /// <summary>
    /// True if this side is gracefully disconnecting.
    /// In such case we have sent last packet to client side and we expect
    /// client will soon broke pipe connection - unless server do it first.
    /// </summary>
    private bool _isClientDisconnecting;
 
    /// <summary>
    /// The thread which runs the asynchronous packet pump.
    /// </summary>
    private Thread? _packetPump;
 
    /// <summary>
    /// The factory used to create and route packets.
    /// </summary>
    private INodePacketFactory? _packetFactory;
 
    /// <summary>
    /// The asynchronous packet queue.
    /// </summary>
    /// <remarks>
    /// Operations on this queue must be synchronized since it is accessible by multiple threads.
    /// Use a lock on the packetQueue itself.
    /// </remarks>
    private ConcurrentQueue<INodePacket>? _packetQueue;
 
    public NodeEndpointOutOfProcTaskHost(byte parentPacketVersion)
    {
        _status = LinkStatus.Inactive;
        _sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
 
        _packetStream = new MemoryStream();
        _binaryWriter = new BinaryWriter(_packetStream);
        _parentPacketVersion = parentPacketVersion;
 
        string pipeName = $"MSBuild{EnvironmentUtilities.CurrentProcessId}";
 
        SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner;
        var security = new PipeSecurity();
 
        // Restrict access to just this account.  We set the owner specifically here, and on the
        // pipe client side they will check the owner against this one - they must have identical
        // SIDs or the client will reject this server.  This is used to avoid attacks where a
        // hacked server creates a less restricted pipe in an attempt to lure us into using it and
        // then sending build requests to the real pipe client (which is the MSBuild Build Manager.)
        var rule = new PipeAccessRule(identifier, PipeAccessRights.ReadWrite, AccessControlType.Allow);
        security.AddAccessRule(rule);
        security.SetOwner(identifier);
 
        _pipeServer = new NamedPipeServerStream(
            pipeName,
            PipeDirection.InOut,
            maxNumberOfServerInstances: 1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous | PipeOptions.WriteThrough,
            inBufferSize: PipeBufferSize,
            outBufferSize: PipeBufferSize,
            security,
            HandleInheritability.None);
    }
 
    /// <summary>
    /// Raised when the link status has changed.
    /// </summary>
    public event LinkStatusChangedDelegate? OnLinkStatusChanged;
 
    /// <summary>
    /// Returns the link status of this node.
    /// </summary>
    public LinkStatus LinkStatus => _status;
 
    /// <summary>
    /// Causes this endpoint to wait for the remote endpoint to connect.
    /// </summary>
    /// <param name="factory">The factory used to create packets.</param>
    public void Listen(INodePacketFactory factory)
    {
        ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive.  Status is {0}", _status);
        ErrorUtilities.VerifyThrowArgumentNull(factory);
 
        _packetFactory = factory;
 
        _isClientDisconnecting = false;
        _packetPump = new Thread(PacketPumpProc)
        {
            IsBackground = true,
            Name = "OutOfProc Endpoint Packet Pump"
        };
 
        _packetAvailable = new AutoResetEvent(false);
        _terminatePacketPump = new AutoResetEvent(false);
        _packetQueue = new ConcurrentQueue<INodePacket>();
        _packetPump.Start();
    }
 
    /// <summary>
    /// Causes this node to connect to the matched endpoint.
    /// </summary>
    /// <param name="factory">The factory used to create packets.</param>
    public void Connect(INodePacketFactory factory)
        => ErrorUtilities.ThrowInternalError("Connect() not valid on the out of proc endpoint.");
 
    /// <summary>
    /// Shuts down the link.
    /// </summary>
    public void Disconnect()
    {
        ErrorUtilities.VerifyThrow(_packetPump != null, $"{nameof(_packetPump)} is null.");
        ErrorUtilities.VerifyThrow(_packetPump.ManagedThreadId != Thread.CurrentThread.ManagedThreadId, "Can't join on the same thread.");
        ErrorUtilities.VerifyThrow(_terminatePacketPump != null, $"{nameof(_terminatePacketPump)} is null.");
 
        _terminatePacketPump.Set();
        _packetPump.Join();
        _terminatePacketPump.Close();
        _pipeServer.Dispose();
        _packetPump = null;
        ChangeLinkStatus(LinkStatus.Inactive);
    }
 
    /// <summary>
    /// Sends data to the peer endpoint.
    /// </summary>
    /// <param name="packet">The packet to send.</param>
    public void SendData(INodePacket packet)
    {
        ErrorUtilities.VerifyThrowArgumentNull(packet);
 
        // PERF: Set up a priority system so logging packets are sent only when all other packet types have been sent.
        if (_status == LinkStatus.Active)
        {
            ErrorUtilities.VerifyThrow(_packetQueue != null, $"{nameof(_packetQueue)} is null");
            ErrorUtilities.VerifyThrow(_packetAvailable != null, $"{nameof(_packetAvailable)} is null");
 
            _packetQueue.Enqueue(packet);
            _packetAvailable.Set();
        }
    }
 
    /// <summary>
    /// Called when we are about to send last packet to finalize graceful disconnection with client.
    /// </summary>
    public void ClientWillDisconnect()
    {
        _isClientDisconnecting = true;
    }
 
    /// <summary>
    /// Updates the current link status if it has changed and notifies any registered delegates.
    /// </summary>
    /// <param name="newStatus">The status the node should now be in.</param>
    private void ChangeLinkStatus(LinkStatus newStatus)
    {
        ErrorUtilities.VerifyThrow(_status != newStatus, "Attempting to change status to existing status {0}.", _status);
        CommunicationsUtilities.Trace($"Changing link status from {_status} to {newStatus}");
        _status = newStatus;
        OnLinkStatusChanged?.Invoke(this, newStatus);
    }
 
    /// <summary>
    /// This method handles the asynchronous message pump.  It waits for messages to show up on the queue
    /// and calls FireDataAvailable for each such packet.  It will terminate when the terminate event is
    /// set.
    /// </summary>
    private void PacketPumpProc()
    {
        ErrorUtilities.VerifyThrow(_packetQueue != null, $"{nameof(_packetQueue)} is null");
        ErrorUtilities.VerifyThrow(_terminatePacketPump != null, $"{nameof(_terminatePacketPump)} is null");
        ErrorUtilities.VerifyThrow(_packetAvailable != null, $"{nameof(_packetAvailable)} is null");
 
        NamedPipeServerStream localPipeServer = _pipeServer;
 
        AutoResetEvent localPacketAvailable = _packetAvailable;
        AutoResetEvent localTerminatePacketPump = _terminatePacketPump;
        ConcurrentQueue<INodePacket> localPacketQueue = _packetQueue;
 
        DateTime originalWaitStartTime = DateTime.UtcNow;
        bool gotValidConnection = false;
        while (!gotValidConnection)
        {
            gotValidConnection = true;
            DateTime restartWaitTime = DateTime.UtcNow;
 
            // We only wait to wait the difference between now and the last original start time, in case we have multiple hosts attempting
            // to attach.  This prevents each attempt from resetting the timer.
            TimeSpan usedWaitTime = restartWaitTime - originalWaitStartTime;
            int waitTimeRemaining = Math.Max(0, CommunicationsUtilities.NodeConnectionTimeout - (int)usedWaitTime.TotalMilliseconds);
 
            try
            {
                // Wait for a connection
                IAsyncResult resultForConnection = localPipeServer.BeginWaitForConnection(null, null);
                CommunicationsUtilities.Trace($"Waiting for connection {waitTimeRemaining} ms...");
                bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false);
                if (!connected)
                {
                    CommunicationsUtilities.Trace("Connection timed out waiting a host to contact us.  Exiting comm thread.");
                    ChangeLinkStatus(LinkStatus.ConnectionFailed);
                    return;
                }
 
                CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent");
                localPipeServer.EndWaitForConnection(resultForConnection);
 
                // The handshake protocol is a series of int exchanges.  The host sends us a each component, and we
                // verify it. Afterwards, the host sends an "End of Handshake" signal, to which we respond in kind.
                // Once the handshake is complete, both sides can be assured the other is ready to accept data.
                Handshake handshake = new(CommunicationsUtilities.GetHandshakeOptions());
                try
                {
                    HandshakeComponents handshakeComponents = handshake.RetrieveHandshakeComponents();
 
                    int index = 0;
                    foreach (var component in handshakeComponents.EnumerateComponents())
                    {
                        byte? byteToAccept = index == 0 ? CommunicationsUtilities.HandshakeVersion : null;
 
                        if (!_pipeServer.TryReadIntForHandshake(
                            byteToAccept, /* this will disconnect a < 16.8 host; it expects leading 00 or F5 or 06. 0x00 is a wildcard */
                            out HandshakeResult result))
                        {
                            CommunicationsUtilities.Trace($"Handshake failed with error: {result.ErrorMessage}");
                        }
 
                        if (!IsHandshakePartValid(component, result.Value))
                        {
                            CommunicationsUtilities.Trace($"Handshake failed. Received {result.Value} from host  for {component.Key} but expected {component.Value}. Probably the host is a different MSBuild build.");
                            _pipeServer.WriteIntForHandshake(index + 1);
                            gotValidConnection = false;
                            break;
                        }
 
                        index++;
                    }
 
                    if (gotValidConnection)
                    {
                        // To ensure that our handshake and theirs have the same number of bytes, receive and send a magic number indicating EOS.
                        if (_pipeServer.TryReadEndOfHandshakeSignal(false, out _))
                        {
                            // Send supported PacketVersion after EndOfHandshakeSignal
                            // Based on this parent node decides how to communicate with the child.
                            if (_parentPacketVersion >= 2)
                            {
                                _pipeServer.WriteIntForHandshake(Handshake.PacketVersionFromChildMarker);  // Marker: PacketVersion follows
                                _pipeServer.WriteIntForHandshake(NodePacketTypeExtensions.PacketVersion);
                                CommunicationsUtilities.Trace($"Sent PacketVersion: {NodePacketTypeExtensions.PacketVersion}");
                            }
 
                            CommunicationsUtilities.Trace("Successfully connected to parent.");
                            _pipeServer.WriteEndOfHandshakeSignal();
 
                            // We will only talk to a host that was started by the same user as us.  Even though the pipe access is set to only allow this user, we want to ensure they
                            // haven't attempted to change those permissions out from under us.  This ensures that the only way they can truly gain access is to be impersonating the
                            // user we were started by.
                            WindowsIdentity currentIdentity = WindowsIdentity.GetCurrent();
                            WindowsIdentity? clientIdentity = null;
                            localPipeServer.RunAsClient(() => { clientIdentity = WindowsIdentity.GetCurrent(true); });
 
                            if (clientIdentity == null || !string.Equals(clientIdentity.Name, currentIdentity.Name, StringComparison.OrdinalIgnoreCase))
                            {
                                string clientIdentityName = clientIdentity != null ? clientIdentity.Name : "<unknown>";
                                CommunicationsUtilities.Trace($"Handshake failed. Host user is {clientIdentityName} but we were created by {currentIdentity.Name}.");
                                gotValidConnection = false;
                                continue;
                            }
                        }
                    }
                }
                catch (IOException e)
                {
                    // We will get here when:
                    // 1. The host (OOP main node) connects to us, it immediately checks for user privileges
                    //    and if they don't match it disconnects immediately leaving us still trying to read the blank handshake
                    // 2. The host is too old sending us bits we automatically reject in the handshake
                    // 3. We expected to read the EndOfHandshake signal, but we received something else
                    CommunicationsUtilities.Trace($"Client connection failed but we will wait for another connection. Exception: {e.Message}");
 
                    gotValidConnection = false;
                }
                catch (InvalidOperationException)
                {
                    gotValidConnection = false;
                }
 
                if (!gotValidConnection)
                {
                    if (localPipeServer.IsConnected)
                    {
                        localPipeServer.Disconnect();
                    }
 
                    continue;
                }
 
                ChangeLinkStatus(LinkStatus.Active);
            }
            catch (Exception e) when (!ExceptionHandling.IsCriticalException(e))
            {
                CommunicationsUtilities.Trace($"Client connection failed.  Exiting comm thread. {e}");
                if (localPipeServer.IsConnected)
                {
                    localPipeServer.Disconnect();
                }
 
                ExceptionHandling.DumpExceptionToFile(e);
                ChangeLinkStatus(LinkStatus.Failed);
                return;
            }
        }
 
        RunReadLoop(
            new BufferedReadStream(_pipeServer),
            _pipeServer,
            localPacketQueue,
            localPacketAvailable,
            localTerminatePacketPump);
 
        CommunicationsUtilities.Trace("Ending read loop");
 
        try
        {
            if (localPipeServer.IsConnected)
            {
                localPipeServer.WaitForPipeDrain();
                localPipeServer.Disconnect();
            }
        }
        catch (Exception)
        {
            // We don't really care if Disconnect somehow fails, but it gives us a chance to do the right thing.
        }
    }
 
    /// <summary>
    /// Method to verify that the handshake part received from the host matches the expected values.
    /// </summary>
    private static bool IsHandshakePartValid(KeyValuePair<string, int> component, int handshakePart)
    {
        if (handshakePart == component.Value)
        {
            return true;
        }
 
        CommunicationsUtilities.Trace($"Handshake failed. Received {handshakePart} from host for {component.Key} but expected {component.Value}. Probably the host is a different MSBuild build.");
 
        return false;
    }
 
    private void RunReadLoop(
        BufferedReadStream localReadPipe,
        NamedPipeServerStream localWritePipe,
        ConcurrentQueue<INodePacket> localPacketQueue,
        AutoResetEvent localPacketAvailable,
        AutoResetEvent localTerminatePacketPump)
    {
        ErrorUtilities.VerifyThrow(_packetFactory != null, $"{nameof(_packetFactory)} is null");
 
        INodePacketFactory packetFactory = _packetFactory;
 
        // Ordering of the wait handles is important.  The first signaled wait handle in the array
        // will be returned by WaitAny if multiple wait handles are signaled.  We prefer to have the
        // terminate event triggered so that we cannot get into a situation where packets are being
        // spammed to the endpoint and it never gets an opportunity to shutdown.
        CommunicationsUtilities.Trace("Entering read loop.");
        byte[] headerByte = new byte[5];
        ITranslator? writeTranslator = null;
        IAsyncResult result = localReadPipe.BeginRead(headerByte, offset: 0, headerByte.Length, callback: null, state: null);
 
        // Ordering is important.  We want packetAvailable to supercede terminate otherwise we will not properly wait for all
        // packets to be sent by other threads which are shutting down, such as the logging thread.
        WaitHandle[] handles =
        [
            result.AsyncWaitHandle,
            localPacketAvailable,
            localTerminatePacketPump,
        ];
 
        bool exitLoop = false;
        do
        {
            int waitId = WaitHandle.WaitAny(handles);
            switch (waitId)
            {
                case 0:
                    {
                        int bytesRead;
                        try
                        {
                            bytesRead = localReadPipe.EndRead(result);
                        }
                        catch (Exception e)
                        {
                            // Lost communications.  Abort (but allow node reuse)
                            CommunicationsUtilities.Trace($"Exception reading from server.  {e}");
                            ExceptionHandling.DumpExceptionToFile(e);
                            ChangeLinkStatus(LinkStatus.Inactive);
                            exitLoop = true;
                            break;
                        }
 
                        if (bytesRead != headerByte.Length)
                        {
                            // Incomplete read.  Abort.
                            if (bytesRead == 0)
                            {
                                if (_isClientDisconnecting)
                                {
                                    CommunicationsUtilities.Trace("Parent disconnected gracefully.");
 
                                    // Do not change link status to failed as this could make node think connection has failed
                                    // and recycle node, while this is perfectly expected and handled race condition
                                    // (both client and node is about to close pipe and client can be faster).
                                }
                                else
                                {
                                    CommunicationsUtilities.Trace("Parent disconnected abruptly.");
                                    ChangeLinkStatus(LinkStatus.Failed);
                                }
                            }
                            else
                            {
                                CommunicationsUtilities.Trace($"Incomplete header read from server.  {bytesRead} of {headerByte.Length} bytes read");
                                ChangeLinkStatus(LinkStatus.Failed);
                            }
 
                            exitLoop = true;
                            break;
                        }
 
                        // Check if this packet has an extended header that includes a version part.
                        byte rawType = headerByte[0];
                        bool hasExtendedHeader = NodePacketTypeExtensions.HasExtendedHeader(rawType);
                        NodePacketType packetType = hasExtendedHeader
                            ? NodePacketTypeExtensions.GetNodePacketType(rawType)
                            : (NodePacketType)rawType;
 
                        byte parentVersion = 0;
                        if (hasExtendedHeader)
                        {
                            parentVersion = NodePacketTypeExtensions.ReadVersion(localReadPipe);
                        }
 
                        try
                        {
                            ITranslator readTranslator = BinaryTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer);
 
                            // parent sends a packet version that is already negotiated during handshake.
                            readTranslator.NegotiatedPacketVersion = parentVersion;
                            packetFactory.DeserializeAndRoutePacket(0, packetType, readTranslator);
                        }
                        catch (Exception e)
                        {
                            // Error while deserializing or handling packet.  Abort.
                            CommunicationsUtilities.Trace($"Exception while deserializing packet {packetType}: {e}");
                            ExceptionHandling.DumpExceptionToFile(e);
                            ChangeLinkStatus(LinkStatus.Failed);
                            exitLoop = true;
                            break;
                        }
 
                        result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
 
                        handles[0] = result.AsyncWaitHandle;
                    }
 
                    break;
 
                case 1:
                case 2:
                    try
                    {
                        // Write out all the queued packets.
                        while (localPacketQueue.TryDequeue(out INodePacket? packet))
                        {
                            var packetStream = _packetStream;
                            packetStream.SetLength(0);
 
                            // Re-use writeTranslator; we clear _packetStream but never replace it.
                            // If _packetStream is ever reassigned, set writeTranslator = null first.
                            writeTranslator ??= BinaryTranslator.GetWriteTranslator(packetStream);
 
                            packetStream.WriteByte((byte)packet.Type);
 
                            // Pad for packet length
                            _binaryWriter.Write(0);
 
                            // Reset the position in the write buffer.
                            packet.Translate(writeTranslator);
 
                            int packetStreamLength = (int)packetStream.Position;
 
                            // Now write in the actual packet length
                            packetStream.Position = 1;
                            _binaryWriter.Write(packetStreamLength - 5);
 
                            localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
                        }
                    }
                    catch (Exception e)
                    {
                        // Error while deserializing or handling packet.  Abort.
                        CommunicationsUtilities.Trace($"Exception while serializing packets: {e}");
                        ExceptionHandling.DumpExceptionToFile(e);
                        ChangeLinkStatus(LinkStatus.Failed);
                        exitLoop = true;
                        break;
                    }
 
                    if (waitId == 2)
                    {
                        CommunicationsUtilities.Trace("Disconnecting voluntarily");
                        ChangeLinkStatus(LinkStatus.Failed);
                        exitLoop = true;
                    }
 
                    break;
 
                default:
                    ErrorUtilities.ThrowInternalError($"waitId {waitId} out of range.");
                    break;
            }
        }
        while (!exitLoop);
    }
}