|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Build.BackEnd.Logging;
#if NETFRAMEWORK
using Microsoft.Build.Eventing;
using System.Security.Principal;
#endif
using Microsoft.Build.Framework;
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
using Constants = Microsoft.Build.Framework.Constants;
#nullable disable
namespace Microsoft.Build.BackEnd
{
/// <summary>
/// Contains the shared pieces of code from NodeProviderOutOfProc
/// and NodeProviderOutOfProcTaskHost.
/// </summary>
internal abstract partial class NodeProviderOutOfProcBase
{
/// <summary>
/// The maximum number of bytes to write
/// </summary>
private const int MaxPacketWriteSize = 1048576;
/// <summary>
/// The number of times to retry creating an out-of-proc node.
/// </summary>
private const int NodeCreationRetries = 10;
/// <summary>
/// The amount of time to wait for an out-of-proc node to spool up before we give up.
/// </summary>
private const int TimeoutForNewNodeCreation = 30000;
/// <summary>
/// The amount of time to wait for an out-of-proc node to exit.
/// </summary>
private const int TimeoutForWaitForExit = 30000;
#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY
private static readonly WindowsIdentity s_currentWindowsIdentity = WindowsIdentity.GetCurrent();
#endif
/// <summary>
/// The build component host.
/// </summary>
private IBuildComponentHost _componentHost;
/// <summary>
/// Keeps track of the processes we've already checked for nodes so we don't check them again.
/// We decided to use ConcurrentDictionary of(string, byte) as common implementation of ConcurrentHashSet.
/// </summary>
private readonly ConcurrentDictionary<string, byte /*void*/> _processesToIgnore = new();
/// <summary>
/// Delegate used to tell the node provider that a context has been created.
/// </summary>
/// <param name="context">The created node context.</param>
internal delegate void NodeContextCreatedDelegate(NodeContext context);
/// <summary>
/// Delegate used to tell the node provider that a context has terminated.
/// </summary>
/// <param name="nodeId">The id of the node which terminated.</param>
internal delegate void NodeContextTerminateDelegate(int nodeId);
/// <summary>
/// The build component host.
/// </summary>
protected IBuildComponentHost ComponentHost
{
get { return _componentHost; }
set { _componentHost = value; }
}
/// <summary>
/// Sends data to the specified node.
/// </summary>
/// <param name="context">The node to which data shall be sent.</param>
/// <param name="packet">The packet to send.</param>
protected void SendData(NodeContext context, INodePacket packet)
{
ErrorUtilities.VerifyThrowArgumentNull(packet);
context.SendData(packet);
}
/// <summary>
/// Shuts down all of the connected managed nodes.
/// </summary>
/// <param name="contextsToShutDown">List of the contexts to be shut down</param>
/// <param name="enableReuse">Flag indicating if nodes should prepare for reuse.</param>
protected void ShutdownConnectedNodes(List<NodeContext> contextsToShutDown, bool enableReuse)
{
// Send the build completion message to the nodes, causing them to shutdown or reset.
_processesToIgnore.Clear();
// We wait for child nodes to exit to avoid them changing the terminal
// after this process terminates.
bool waitForExit = !enableReuse &&
!Console.IsInputRedirected &&
Traits.Instance.EscapeHatches.EnsureStdOutForChildNodesIsPrimaryStdout;
// Determine which nodes should actually be reused based on system-wide node count
bool[] shouldReuseNode = DetermineNodesForReuse(contextsToShutDown.Count, enableReuse);
Task[] waitForExitTasks = waitForExit && contextsToShutDown.Count > 0 ? new Task[contextsToShutDown.Count] : null;
int i = 0;
int contextIndex = 0;
var loggingService = _componentHost.LoggingService;
foreach (NodeContext nodeContext in contextsToShutDown)
{
if (nodeContext is null)
{
contextIndex++;
continue;
}
// Use the per-node reuse decision
bool reuseThisNode = shouldReuseNode[contextIndex++];
nodeContext.SendData(new NodeBuildComplete(reuseThisNode));
if (!reuseThisNode || waitForExit)
{
if (i < (waitForExitTasks?.Length ?? 0))
{
waitForExitTasks[i++] = nodeContext.WaitForExitAsync(loggingService);
}
}
}
if (waitForExitTasks != null && i > 0)
{
if (i < waitForExitTasks.Length)
{
Array.Resize(ref waitForExitTasks, i);
}
Task.WaitAll(waitForExitTasks);
}
}
/// <summary>
/// Shuts down all of the managed nodes permanently.
/// </summary>
/// <param name="nodeReuse">Whether to reuse the node</param>
/// <param name="terminateNode">Delegate used to tell the node provider that a context has terminated</param>
protected void ShutdownAllNodes(bool nodeReuse, NodeContextTerminateDelegate terminateNode)
{
// INodePacketFactory
INodePacketFactory factory = new NodePacketFactory();
List<Process> nodeProcesses = GetPossibleRunningNodes().nodeProcesses.ToList();
// Find proper MSBuildTaskHost executable name
string msbuildtaskhostExeName = NodeProviderOutOfProcTaskHost.TaskHostNameForClr2TaskHost;
// Search for all instances of msbuildtaskhost process and add them to the process list
nodeProcesses.AddRange(Process.GetProcessesByName(Path.GetFileNameWithoutExtension(msbuildtaskhostExeName)));
// For all processes in the list, send signal to terminate if able to connect
foreach (Process nodeProcess in nodeProcesses)
{
// A 2013 comment suggested some nodes take this long to respond, so a smaller timeout would miss nodes.
int timeout = 30;
// Attempt to connect to the process with the handshake without low priority.
Stream nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, false), out HandshakeResult result);
if (nodeStream == null)
{
// If we couldn't connect attempt to connect to the process with the handshake including low priority.
nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, true), out result);
}
if (nodeStream != null)
{
// If we're able to connect to such a process, send a packet requesting its termination
CommunicationsUtilities.Trace("Shutting down node with pid = {0}", nodeProcess.Id);
NodeContext nodeContext = new NodeContext(0, nodeProcess, nodeStream, factory, terminateNode, result.NegotiatedPacketVersion);
nodeContext.SendData(new NodeBuildComplete(false /* no node reuse */));
nodeStream.Dispose();
}
}
}
/// <summary>
/// Finds or creates child processes which can act as nodes using the provided launch data.
/// </summary>
/// <param name="nodeLaunchData">The launch configuration containing executable path, arguments, and environment overrides.</param>
/// <param name="nextNodeId">The next node ID to use.</param>
/// <param name="factory">The packet factory for communication.</param>
/// <param name="createNode">Callback when a node is created.</param>
/// <param name="terminateNode">Callback when a node terminates.</param>
/// <param name="numberOfNodesToCreate">Number of nodes to create.</param>
protected IList<NodeContext> GetNodes(
NodeLaunchData nodeLaunchData,
int nextNodeId,
INodePacketFactory factory,
NodeContextCreatedDelegate createNode,
NodeContextTerminateDelegate terminateNode,
int numberOfNodesToCreate)
{
string commandLineArgs = nodeLaunchData.CommandLineArgs;
#if DEBUG
if (Execution.BuildManager.WaitForDebugger)
{
commandLineArgs += " /wfd";
}
#endif
var msbuildLocation = nodeLaunchData.MSBuildLocation;
if (String.IsNullOrEmpty(msbuildLocation))
{
msbuildLocation = _componentHost.BuildParameters.NodeExeLocation;
}
if (String.IsNullOrEmpty(msbuildLocation))
{
string msbuildExeName = Environment.GetEnvironmentVariable("MSBUILD_EXE_NAME");
if (!String.IsNullOrEmpty(msbuildExeName))
{
// we assume that MSBUILD_EXE_NAME is, in fact, just the name.
msbuildLocation = Path.Combine(msbuildExeName, ".exe");
}
}
bool nodeReuseRequested = Handshake.IsHandshakeOptionEnabled(nodeLaunchData.Handshake.HandshakeOptions, HandshakeOptions.NodeReuse);
// Extract the expected NodeMode from the command line arguments
NodeMode? expectedNodeMode = NodeModeHelper.ExtractFromCommandLine(commandLineArgs);
// Get all process of possible running node processes for reuse and put them into ConcurrentQueue.
// Processes from this queue will be concurrently consumed by TryReusePossibleRunningNodes while
// trying to connect to them and reuse them. When queue is empty, no process to reuse left
// new node process will be started.
string expectedProcessName = null;
ConcurrentQueue<Process> possibleRunningNodes = null;
// Try to connect to idle nodes if node reuse is enabled.
if (nodeReuseRequested)
{
IList<Process> possibleRunningNodesList;
(expectedProcessName, possibleRunningNodesList) = GetPossibleRunningNodes(msbuildLocation, expectedNodeMode);
possibleRunningNodes = new ConcurrentQueue<Process>(possibleRunningNodesList);
if (possibleRunningNodesList.Count > 0)
{
CommunicationsUtilities.Trace("Attempting to connect to {1} existing processes '{0}'...", expectedProcessName, possibleRunningNodesList.Count);
}
}
ConcurrentQueue<NodeContext> nodeContexts = new();
ConcurrentQueue<Exception> exceptions = new();
int currentProcessId = EnvironmentUtilities.CurrentProcessId;
Parallel.For(nextNodeId, nextNodeId + numberOfNodesToCreate, (nodeId) =>
{
try
{
if (nodeReuseRequested && TryReuseAnyFromPossibleRunningNodes(currentProcessId, nodeId))
{
return;
}
if (!StartNewNode(nodeId))
{
// We were unable to reuse or launch a node.
CommunicationsUtilities.Trace("FAILED TO CONNECT TO A CHILD NODE");
}
}
catch (Exception ex)
{
// It will be rethrown as aggregate exception
exceptions.Enqueue(ex);
}
});
if (!exceptions.IsEmpty)
{
ErrorUtilities.ThrowInternalError(
$"Cannot acquire required number of nodes. MSBuildLocation: '{msbuildLocation}', CommandLineArgs: '{commandLineArgs}', NumberOfNodesToCreate: {numberOfNodesToCreate}, NextNodeId: {nextNodeId}.",
new AggregateException(exceptions.ToArray()));
}
return nodeContexts.ToList();
bool TryReuseAnyFromPossibleRunningNodes(int currentProcessId, int nodeId)
{
while (possibleRunningNodes != null && possibleRunningNodes.TryDequeue(out var nodeToReuse))
{
CommunicationsUtilities.Trace("Trying to connect to existing process {2} with id {1} to establish node {0}...", nodeId, nodeToReuse.Id, nodeToReuse.ProcessName);
if (nodeToReuse.Id == currentProcessId)
{
continue;
}
// Get the full context of this inspection so that we can always skip this process when we have the same taskhost context
string nodeLookupKey = GetProcessesToIgnoreKey(nodeLaunchData.Handshake, nodeToReuse.Id);
if (_processesToIgnore.ContainsKey(nodeLookupKey))
{
continue;
}
// We don't need to check this again
_processesToIgnore.TryAdd(nodeLookupKey, default);
// Attempt to connect to each process in turn.
Stream nodeStream = TryConnectToProcess(nodeToReuse.Id, 0 /* poll, don't wait for connections */, nodeLaunchData.Handshake, out HandshakeResult result);
if (nodeStream != null)
{
// Connection successful, use this node.
CommunicationsUtilities.Trace("Successfully connected to existed node {0} which is PID {1}", nodeId, nodeToReuse.Id);
string msg = ResourceUtilities.FormatResourceStringIgnoreCodeAndKeyword("NodeReused", nodeId, nodeToReuse.Id);
_componentHost.LoggingService.LogBuildEvent(new BuildMessageEventArgs(msg, null, null, MessageImportance.Low)
{
BuildEventContext = new BuildEventContext(nodeId, BuildEventContext.InvalidTargetId, BuildEventContext.InvalidProjectContextId, BuildEventContext.InvalidTaskId)
});
CreateNodeContext(nodeId, nodeToReuse, nodeStream, result.NegotiatedPacketVersion);
return true;
}
}
return false;
}
// Create a new node process.
bool StartNewNode(int nodeId)
{
CommunicationsUtilities.Trace("Could not connect to existing process, now creating a process...");
// We try this in a loop because it is possible that there is another MSBuild multiproc
// host process running somewhere which is also trying to create nodes right now. It might
// find our newly created node and connect to it before we get a chance.
int retries = NodeCreationRetries;
while (retries-- > 0)
{
#if FEATURE_NET35_TASKHOST
// We will also check to see if .NET 3.5 is installed in the case where we need to launch a CLR2 OOP TaskHost.
// Failure to detect this has been known to stall builds when Windows pops up a related dialog.
// It's also a waste of time when we attempt several times to launch multiple MSBuildTaskHost.exe (CLR2 TaskHost)
// nodes because we should never be able to connect in this case.
string taskHostNameForClr2TaskHost = Path.GetFileNameWithoutExtension(NodeProviderOutOfProcTaskHost.TaskHostNameForClr2TaskHost);
if (Path.GetFileNameWithoutExtension(msbuildLocation).Equals(taskHostNameForClr2TaskHost, StringComparison.OrdinalIgnoreCase))
{
if (FrameworkLocationHelper.GetPathToDotNetFrameworkV35(DotNetFrameworkArchitecture.Current) == null)
{
CommunicationsUtilities.Trace(
"Failed to launch node from {0}. The required .NET Framework v3.5 is not installed or enabled. CommandLine: {1}",
msbuildLocation,
commandLineArgs);
string nodeFailedToLaunchError = ResourceUtilities.GetResourceString("TaskHostNodeFailedToLaunchErrorCodeNet35NotInstalled");
throw new NodeFailedToLaunchException(null, nodeFailedToLaunchError);
}
}
#endif
// Create the node process
INodeLauncher nodeLauncher = (INodeLauncher)_componentHost.GetComponent(BuildComponentType.NodeLauncher);
NodeLaunchData launchData = new(msbuildLocation, commandLineArgs, nodeLaunchData.Handshake, nodeLaunchData.EnvironmentOverrides);
Process msbuildProcess = nodeLauncher.Start(launchData, nodeId);
_processesToIgnore.TryAdd(GetProcessesToIgnoreKey(nodeLaunchData.Handshake, msbuildProcess.Id), default);
// Note, when running under IMAGEFILEEXECUTIONOPTIONS registry key to debug, the process ID
// gotten back from CreateProcess is that of the debugger, which causes this to try to connect
// to the debugger process. Instead, use MSBUILDDEBUGONSTART=1
// Now try to connect to it.
Stream nodeStream = TryConnectToProcess(msbuildProcess.Id, TimeoutForNewNodeCreation, nodeLaunchData.Handshake, out HandshakeResult result);
if (nodeStream != null)
{
// Connection successful, use this node.
CommunicationsUtilities.Trace("Successfully connected to created node {0} which is PID {1}", nodeId, msbuildProcess.Id);
CreateNodeContext(nodeId, msbuildProcess, nodeStream, result.NegotiatedPacketVersion);
return true;
}
if (msbuildProcess.HasExited)
{
if (Traits.Instance.DebugNodeCommunication)
{
try
{
CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it has exited with exit code {1}. This can indicate a crash at startup", msbuildProcess.Id, msbuildProcess.ExitCode);
}
catch (InvalidOperationException)
{
// This case is common on Windows where we called CreateProcess and the Process object
// can't get the exit code.
CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it has exited with unknown exit code. This can indicate a crash at startup", msbuildProcess.Id);
}
}
}
else
{
CommunicationsUtilities.Trace("Could not connect to node with PID {0}; it is still running. This can occur when two multiprocess builds run in parallel and the other one 'stole' this node", msbuildProcess.Id);
}
}
return false;
}
void CreateNodeContext(int nodeId, Process nodeToReuse, Stream nodeStream, byte negotiatedVersion)
{
NodeContext nodeContext = new(nodeId, nodeToReuse, nodeStream, factory, terminateNode, negotiatedVersion, nodeLaunchData.Handshake.HandshakeOptions);
nodeContexts.Enqueue(nodeContext);
createNode(nodeContext);
}
}
/// <summary>
/// Finds processes that could be reusable MSBuild nodes.
/// </summary>
/// <param name="msbuildLocation">The location of the MSBuild executable used to derive the expected process name. </param>
/// <param name="expectedNodeMode">The NodeMode to filter for, or null to include all.</param>
/// <returns>
/// Item 1 is a descriptive name of the processes being searched for.
/// Item 2 is the list of matching processes, sorted by ID.
/// </returns>
private (string expectedProcessName, IList<Process> nodeProcesses) GetPossibleRunningNodes(
string msbuildLocation = null,
NodeMode? expectedNodeMode = null)
{
bool isNativeHost = msbuildLocation != null && Path.GetFileName(msbuildLocation).Equals(Constants.MSBuildExecutableName, StringComparison.OrdinalIgnoreCase);
string expectedProcessName = Path.GetFileNameWithoutExtension(isNativeHost ? msbuildLocation : (CurrentHost.GetCurrentHost() ?? msbuildLocation));
Process[] processes;
try
{
processes = Process.GetProcessesByName(expectedProcessName);
}
catch
{
// Process enumeration can fail due to permissions or transient OS errors.
return (expectedProcessName, Array.Empty<Process>());
}
bool shouldFilterByNodeMode = expectedNodeMode.HasValue && ChangeWaves.AreFeaturesEnabled(ChangeWaves.Wave18_5);
if (shouldFilterByNodeMode)
{
return (expectedProcessName, FilterProcessesByNodeMode(processes, expectedNodeMode.Value, expectedProcessName));
}
Array.Sort(processes, static (left, right) => left.Id.CompareTo(right.Id));
return (expectedProcessName, processes);
}
/// <summary>
/// Filters candidate processes whose command-line NodeMode argument matches the expected value.
/// Processes whose command line cannot be retrieved (unsupported platform) are included
/// unconditionally to preserve node reuse on those platforms.
/// </summary>
private static IList<Process> FilterProcessesByNodeMode(Process[] processes, NodeMode expectedNodeMode, string expectedProcessName)
{
CommunicationsUtilities.Trace("Filtering {0} candidate processes by NodeMode {1} for process name '{2}'", processes.Length, expectedNodeMode, expectedProcessName);
List<Process> filtered = new(capacity: processes.Length);
foreach (Process process in processes)
{
try
{
if (!process.TryGetCommandLine(out string commandLine))
{
CommunicationsUtilities.Trace("Skipping process {0} - unable to retrieve command line", process.Id);
continue;
}
if (commandLine is null)
{
// Command-line retrieval is not supported on this platform.
// Include the process so that node reuse is not silently broken.
CommunicationsUtilities.Trace("Including process {0} - command line retrieval not supported on this platform", process.Id);
filtered.Add(process);
continue;
}
NodeMode? processNodeMode = NodeModeHelper.ExtractFromCommandLine(commandLine);
if (processNodeMode.HasValue && processNodeMode.Value == expectedNodeMode)
{
CommunicationsUtilities.Trace("Including process {0} with matching NodeMode {1}", process.Id, processNodeMode.Value);
filtered.Add(process);
}
else
{
CommunicationsUtilities.Trace(
"Skipping process {0} - NodeMode mismatch. Expected: {1}, Found: {2}. Command line: {3}",
process.Id, expectedNodeMode,
processNodeMode?.ToString() ?? "<null>",
commandLine);
}
}
catch (Exception ex)
{
CommunicationsUtilities.Trace("Skipping process {0} - error retrieving command line: {1}", process.Id, ex.Message);
}
}
filtered.Sort(static (left, right) => left.Id.CompareTo(right.Id));
CommunicationsUtilities.Trace("Filtered to {0} processes matching NodeMode {1}", filtered.Count, expectedNodeMode);
return filtered;
}
/// <summary>
/// Generate a string from task host context and the remote process to be used as key to lookup processes we have already
/// attempted to connect to or are already connected to
/// </summary>
private string GetProcessesToIgnoreKey(Handshake hostHandshake, int nodeProcessId) =>
#if NET
string.Create(CultureInfo.InvariantCulture, $"{hostHandshake}|{nodeProcessId}");
#else
$"{hostHandshake}|{nodeProcessId.ToString(CultureInfo.InvariantCulture)}";
#endif
/// <summary>
/// Determines which nodes should be reused based on system-wide node count to avoid over-provisioning.
/// </summary>
/// <param name="nodeCount">The number of nodes in this MSBuild instance</param>
/// <param name="enableReuse">Whether reuse is enabled at all</param>
/// <returns>Array indicating which nodes should be reused (true) or terminated (false)</returns>
protected virtual bool[] DetermineNodesForReuse(int nodeCount, bool enableReuse)
{
bool[] shouldReuse = new bool[nodeCount];
// If reuse is disabled, no nodes should be reused
if (!enableReuse)
{
return shouldReuse; // All false
}
// Get threshold for this node type
int maxNodesToKeep = GetNodeReuseThreshold();
// If threshold is 0, terminate all nodes in this instance
if (maxNodesToKeep == 0)
{
CommunicationsUtilities.Trace("Node reuse threshold is 0, terminating all {0} nodes", nodeCount);
return shouldReuse; // All false
}
// Count system-wide active nodes of the same type
int systemWideNodeCount = CountSystemWideActiveNodes();
CommunicationsUtilities.Trace("System-wide node count: {0}, threshold: {1}, this instance has: {2} nodes",
systemWideNodeCount, maxNodesToKeep, nodeCount);
// If we're already under the threshold system-wide, keep all our nodes
if (systemWideNodeCount <= maxNodesToKeep)
{
for (int i = 0; i < nodeCount; i++)
{
shouldReuse[i] = true;
}
return shouldReuse;
}
// We're over-provisioned. Determine how many of our nodes to keep.
// Strategy: Keep nodes up to the threshold, terminate the rest.
// This instance's contribution is limited to help reach the threshold.
int nodesToKeepInThisInstance = Math.Max(0, maxNodesToKeep - (systemWideNodeCount - nodeCount));
CommunicationsUtilities.Trace("Keeping {0} of {1} nodes in this instance to help meet threshold of {2}",
nodesToKeepInThisInstance, nodeCount, maxNodesToKeep);
// Mark the first N nodes for reuse
for (int i = 0; i < Math.Min(nodesToKeepInThisInstance, nodeCount); i++)
{
shouldReuse[i] = true;
}
return shouldReuse;
}
/// <summary>
/// Gets the maximum number of nodes of this type that should remain active system-wide.
/// </summary>
/// <returns>The threshold for node reuse</returns>
protected virtual int GetNodeReuseThreshold()
{
// Default for worker nodes: 1.5 * NUM_PROCS - aka if there are more nodes than 1 build would create
return Math.Max(1, (3 * NativeMethodsShared.GetLogicalCoreCount()) / 2);
}
/// <summary>
/// Counts the number of active MSBuild node processes of the same type system-wide.
/// Uses improved node detection logic to filter by NodeMode and handle dotnet processes.
/// </summary>
/// <returns>The count of active node processes</returns>
protected virtual int CountSystemWideActiveNodes()
=> CountActiveNodesWithMode(NodeMode.OutOfProcNode);
/// <summary>
/// Counts the number of active MSBuild processes running with the specified <see cref="NodeMode"/>.
/// Includes the current process in the count if it matches.
/// Used by out-of-proc nodes (e.g., server node) to detect over-provisioning at build completion.
/// </summary>
/// <param name="nodeMode">The node mode to filter for.</param>
/// <returns>The number of matching processes, or 0 if enumeration fails or the feature wave is disabled.</returns>
internal static int CountActiveNodesWithMode(NodeMode nodeMode)
{
try
{
(_, IList<Process> nodeProcesses) = GetPossibleRunningNodes(nodeMode);
int count = nodeProcesses.Count;
foreach (var process in nodeProcesses)
{
process?.Dispose();
}
return count;
}
catch (Exception ex)
{
CommunicationsUtilities.Trace("Error counting system-wide nodes with mode {0}: {1}", nodeMode, ex.Message);
return 0;
}
}
private static (string expectedProcessName, IList<Process> nodeProcesses) GetPossibleRunningNodes(NodeMode? expectedNodeMode)
{
var expectedProcessName = Constants.MSBuildAppName;
Process[] processes;
try
{
processes = Process.GetProcessesByName(expectedProcessName);
}
catch
{
return (expectedProcessName, Array.Empty<Process>());
}
if (expectedNodeMode.HasValue && ChangeWaves.AreFeaturesEnabled(ChangeWaves.Wave18_5))
{
List<Process> filteredProcesses = [];
foreach (var process in processes)
{
try
{
if (!process.TryGetCommandLine(out string commandLine))
{
continue;
}
if (commandLine is null)
{
filteredProcesses.Add(process);
continue;
}
NodeMode? processNodeMode = NodeModeHelper.ExtractFromCommandLine(commandLine);
if (processNodeMode.HasValue && processNodeMode.Value == expectedNodeMode.Value)
{
filteredProcesses.Add(process);
}
}
catch
{
continue;
}
}
filteredProcesses.Sort((left, right) => left.Id.CompareTo(right.Id));
return (expectedProcessName, filteredProcesses);
}
Array.Sort(processes, (left, right) => left.Id.CompareTo(right.Id));
return (expectedProcessName, processes);
}
#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY
// This code needs to be in a separate method so that we don't try (and fail) to load the Windows-only APIs when JIT-ing the code
// on non-Windows operating systems
private static void ValidateRemotePipeSecurityOnWindows(NamedPipeClientStream nodeStream)
{
SecurityIdentifier identifier = s_currentWindowsIdentity.Owner;
#if FEATURE_PIPE_SECURITY
PipeSecurity remoteSecurity = nodeStream.GetAccessControl();
#else
var remoteSecurity = new PipeSecurity(nodeStream.SafePipeHandle, System.Security.AccessControl.AccessControlSections.Access |
System.Security.AccessControl.AccessControlSections.Owner | System.Security.AccessControl.AccessControlSections.Group);
#endif
IdentityReference remoteOwner = remoteSecurity.GetOwner(typeof(SecurityIdentifier));
if (remoteOwner != identifier)
{
CommunicationsUtilities.Trace("The remote pipe owner {0} does not match {1}", remoteOwner.Value, identifier.Value);
throw new UnauthorizedAccessException();
}
}
#endif
/// <summary>
/// Attempts to connect to the specified process.
/// </summary>
private Stream TryConnectToProcess(int nodeProcessId, int timeout, Handshake handshake, out HandshakeResult result)
{
// Try and connect to the process.
string pipeName = NamedPipeUtil.GetPlatformSpecificPipeName(nodeProcessId);
#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter
NamedPipeClientStream nodeStream = new NamedPipeClientStream(
serverName: ".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous
#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY
| PipeOptions.CurrentUserOnly
#endif
);
#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter
CommunicationsUtilities.Trace("Attempting connect to PID {0} with pipe {1} with timeout {2} ms", nodeProcessId, pipeName, timeout);
try
{
if (TryConnectToPipeStream(nodeStream, pipeName, handshake, timeout, out result))
{
return nodeStream;
}
else
{
CommunicationsUtilities.Trace("Failed to connect to pipe {0}. {1}", pipeName, result.ErrorMessage.TrimEnd());
nodeStream?.Dispose();
return null;
}
}
catch (Exception e) when (!ExceptionHandling.IsCriticalException(e))
{
// Can be:
// UnauthorizedAccessException -- Couldn't connect, might not be a node.
// IOException -- Couldn't connect, already in use.
// TimeoutException -- Couldn't connect, might not be a node.
// InvalidOperationException – Couldn’t connect, probably a different build
CommunicationsUtilities.Trace("Failed to connect to pipe {0}. {1}", pipeName, e.Message.TrimEnd());
// If we don't close any stream, we might hang up the child
nodeStream?.Dispose();
}
result = HandshakeResult.Failure(HandshakeStatus.Undefined, "Check the COMM traces to diagnose the issue with communication.");
return null;
}
/// <summary>
/// Connect to named pipe stream and ensure validate handshake and security.
/// </summary>
/// <remarks>
/// Reused by MSBuild server client <see cref="Experimental.MSBuildClient"/>.
/// </remarks>
internal static bool TryConnectToPipeStream(NamedPipeClientStream nodeStream, string pipeName, Handshake handshake, int timeout, out HandshakeResult result)
{
nodeStream.Connect(timeout);
#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY
if (NativeMethodsShared.IsWindows)
{
// Verify that the owner of the pipe is us. This prevents a security hole where a remote node has
// been faked up with ACLs that would let us attach to it. It could then issue fake build requests back to
// us, potentially causing us to execute builds that do harmful or unexpected things. The pipe owner can
// only be set to the user's own SID by a normal, unprivileged process. The conditions where a faked up
// remote node could set the owner to something else would also let it change owners on other objects, so
// this would be a security flaw upstream of us.
ValidateRemotePipeSecurityOnWindows(nodeStream);
}
#endif
HandshakeComponents handshakeComponents = handshake.RetrieveHandshakeComponents();
foreach (var component in handshakeComponents.EnumerateComponents())
{
CommunicationsUtilities.Trace("Writing handshake part {0} ({1}) to pipe {2}", component.Key, component.Value, pipeName);
nodeStream.WriteIntForHandshake(component.Value);
}
// This indicates that we have finished all the parts of our handshake; hopefully the endpoint has as well.
nodeStream.WriteEndOfHandshakeSignal();
CommunicationsUtilities.Trace("Reading handshake from pipe {0}", pipeName);
if (nodeStream.TryReadEndOfHandshakeSignal(
true,
#if NETCOREAPP2_1_OR_GREATER
timeout,
#endif
out HandshakeResult innerResult))
{
// We got a connection.
CommunicationsUtilities.Trace("Successfully connected to pipe {0}...!", pipeName);
result = HandshakeResult.Success(0, innerResult.NegotiatedPacketVersion);
return true;
}
else
{
result = innerResult;
return false;
}
}
/// <summary>
/// Class which wraps up the communications infrastructure for a given node.
/// </summary>
internal sealed class NodeContext
{
private enum ExitPacketState
{
None,
ExitPacketQueued,
ExitPacketSent
}
// The pipe(s) used to communicate with the node.
private readonly Stream _pipeStream;
/// <summary>
/// The factory used to create packets from data read off the pipe.
/// </summary>
private readonly INodePacketFactory _packetFactory;
/// <summary>
/// The node id assigned by the node provider.
/// </summary>
private readonly int _nodeId;
/// <summary>
/// The node process.
/// </summary>
private readonly Process _process;
internal Process Process { get { return _process; } }
/// <summary>
/// An array used to store the header byte for each packet when read.
/// </summary>
private readonly byte[] _headerByte;
/// <summary>
/// Handshake options used to connect to the node.
/// </summary>
private HandshakeOptions _handshakeOptions;
/// <summary>
/// A buffer typically big enough to handle a packet body.
/// We use this as a convenient way to manage and cache a byte[] that's resized
/// automatically to fit our payload.
/// </summary>
private readonly MemoryStream _readBufferMemoryStream;
/// <summary>
/// A reusable buffer for writing packets.
/// </summary>
private readonly MemoryStream _writeBufferMemoryStream;
private readonly ITranslator _readTranslator;
private readonly ITranslator _writeTranslator;
#if FEATURE_APM
// cached delegates for pipe stream read callbacks
private readonly AsyncCallback _headerReadCompleteCallback;
private readonly AsyncCallback _bodyReadCompleteCallback;
#endif
/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private readonly ConcurrentQueue<INodePacket> _packetWriteQueue;
/// <summary>
/// Delegate called when the context terminates.
/// </summary>
private readonly NodeContextTerminateDelegate _terminateDelegate;
/// <summary>
/// A dedicated thread to consume enqueued packets.
/// </summary>
private readonly Thread _drainPacketQueueThread;
/// <summary>
/// Used to signal the consuming thread that a packet has been enqueued;
/// </summary>
private readonly AutoResetEvent _packetEnqueued;
/// <summary>
/// Used to signal that the exit packet has been sent and we no longer need to wait for the queue to drain.
/// </summary>
private readonly CancellationTokenSource _packetQueueDrainDelayCancellation;
/// <summary>
/// Tracks the state of the packet sent to terminate the node.
/// </summary>
private ExitPacketState _exitPacketState;
/// <summary>
/// The minimum packet version supported by both the host and the node.
/// </summary>
private readonly byte _negotiatedPacketVersion;
#if FEATURE_APM
// used in BodyReadComplete callback to avoid allocations due to passing state through BeginRead
private int _currentPacketLength;
#endif
/// <summary>
/// Constructor.
/// </summary>
public NodeContext(
int nodeId,
Process process,
Stream nodePipe,
INodePacketFactory factory,
NodeContextTerminateDelegate terminateDelegate,
byte negotiatedVersion,
HandshakeOptions handshakeOptions = HandshakeOptions.None)
{
_nodeId = nodeId;
_process = process;
_pipeStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
_readBufferMemoryStream = new MemoryStream();
_writeBufferMemoryStream = new MemoryStream();
_readTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer());
_writeTranslator = BinaryTranslator.GetWriteTranslator(_writeBufferMemoryStream);
_terminateDelegate = terminateDelegate;
_handshakeOptions = handshakeOptions;
_negotiatedPacketVersion = negotiatedVersion;
#if FEATURE_APM
_headerReadCompleteCallback = HeaderReadComplete;
_bodyReadCompleteCallback = BodyReadComplete;
_currentPacketLength = 0;
#endif
_packetWriteQueue = new ConcurrentQueue<INodePacket>();
_packetEnqueued = new AutoResetEvent(false);
_packetQueueDrainDelayCancellation = new CancellationTokenSource();
// We select a thread size empirically - for debug builds the minimum possible stack size was too small.
// The current size is reported to not have the issue.
_drainPacketQueueThread = new Thread(DrainPacketQueue, 0x30000);
_drainPacketQueueThread.Name = "DrainPacketQueueThread";
_drainPacketQueueThread.IsBackground = true;
_drainPacketQueueThread.Start(this);
}
/// <summary>
/// Id of node.
/// </summary>
public int NodeId => _nodeId;
/// <summary>
/// Starts a new asynchronous read operation for this node.
/// </summary>
public void BeginAsyncPacketRead()
{
#if FEATURE_APM
_pipeStream.BeginRead(_headerByte, 0, _headerByte.Length, _headerReadCompleteCallback, this);
#else
ThreadPool.QueueUserWorkItem(delegate
{
var ignored = RunPacketReadLoopAsync();
});
#endif
}
#if !FEATURE_APM
public async Task RunPacketReadLoopAsync()
{
while (true)
{
try
{
int bytesRead = await _pipeStream.ReadAsync(_headerByte.AsMemory(), CancellationToken.None).ConfigureAwait(false);
if (!ProcessHeaderBytesRead(bytesRead))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync: {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();
try
{
int totalBytesRead = 0;
while (totalBytesRead < packetLength)
{
int bytesRead = await _pipeStream.ReadAsync(packetData.AsMemory(totalBytesRead, packetLength - totalBytesRead), CancellationToken.None).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}
totalBytesRead += bytesRead;
}
if (!ProcessBodyBytesRead(totalBytesRead, packetLength, packetType))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync (Reading): {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}
// Read and route the packet.
if (!ReadAndRoutePacket(packetType))
{
return;
}
if (packetType == NodePacketType.NodeShutdown)
{
Close();
return;
}
}
}
#endif
/// <summary>
/// Sends the specified packet to this node asynchronously.
/// The method enqueues a task to write the packet and returns
/// immediately. This is because SendData() is on a hot path
/// under the primary lock (BuildManager's _syncLock)
/// and we want to minimize our time there.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
if (IsExitPacket(packet))
{
_exitPacketState = ExitPacketState.ExitPacketQueued;
}
_packetWriteQueue.Enqueue(packet);
_packetEnqueued.Set();
}
/// <summary>
/// We use a dedicated thread to avoid blocking a threadpool thread.
/// </summary>
/// <remarks>Usually there'll be a single packet in the queue, but sometimes
/// a burst of SendData comes in, with 10-20 packets scheduled.</remarks>
private void DrainPacketQueue(object state)
{
NodeContext context = (NodeContext)state;
MemoryStream writeStream = context._writeBufferMemoryStream;
Stream serverToClientStream = context._pipeStream;
while (true)
{
context._packetEnqueued.WaitOne();
while (context._packetWriteQueue.TryDequeue(out INodePacket packet))
{
// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);
ITranslator writeTranslator = context._writeTranslator;
try
{
NodePacketType packetType = packet.Type;
// Write packet type with extended header.
// On the receiving side we will check if the extended header is present before making an attempt to read the packet version.
bool extendedHeaderCreated = NodePacketTypeExtensions.TryCreateExtendedHeaderType(_handshakeOptions, packetType, out byte rawPacketType);
writeStream.WriteByte(rawPacketType);
// Pad for the packet length
WriteInt32(writeStream, 0);
if (extendedHeaderCreated)
{
// Write extended header with version BEFORE writing packet data
NodePacketTypeExtensions.WriteVersion(writeStream, context._negotiatedPacketVersion);
writeTranslator.NegotiatedPacketVersion = context._negotiatedPacketVersion;
}
else if (!Handshake.IsHandshakeOptionEnabled(_handshakeOptions, HandshakeOptions.CLR2))
{
// CLR4 task hosts: set version to 0 to enable version-dependent fields.
// CLR2 task hosts: leave as null (default) to skip version-dependent fields.
writeTranslator.NegotiatedPacketVersion = 0;
}
packet.Translate(writeTranslator);
int writeStreamLength = (int)writeStream.Position;
// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);
byte[] writeStreamBuffer = writeStream.GetBuffer();
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
if (packet is NodeBuildComplete)
{
if (IsExitPacket(packet))
{
context._exitPacketState = ExitPacketState.ExitPacketSent;
context._packetQueueDrainDelayCancellation.Cancel();
}
return;
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}
}
}
private static bool IsExitPacket(INodePacket packet)
{
return packet is NodeBuildComplete buildCompletePacket && !buildCompletePacket.PrepareForReuse;
}
/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private static void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
stream.WriteByte((byte)(value >> 16));
stream.WriteByte((byte)(value >> 24));
}
/// <summary>
/// Closes the node's context, disconnecting it from the node.
/// </summary>
private void Close()
{
_pipeStream.Dispose();
_terminateDelegate(_nodeId);
}
/// <summary>
/// Waits for the child node process to exit.
/// </summary>
public async Task WaitForExitAsync(ILoggingService loggingService)
{
if (_exitPacketState == ExitPacketState.ExitPacketQueued)
{
// Wait up to 100ms until all remaining packets are sent.
// We don't need to wait long, just long enough for the Task to start running on the ThreadPool.
#if NET
await Task.Delay(100, _packetQueueDrainDelayCancellation.Token).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
#else
await Task.WhenAny(Task.Delay(100, _packetQueueDrainDelayCancellation.Token));
#endif
}
_packetQueueDrainDelayCancellation?.Dispose();
if (_exitPacketState == ExitPacketState.ExitPacketSent)
{
CommunicationsUtilities.Trace("Waiting for node with pid = {0} to exit", _process.Id);
// .NET 5 introduces a real WaitForExitAsyc.
// This is a poor man's implementation that uses polling.
int timeout = TimeoutForWaitForExit;
int delay = 5;
while (timeout > 0)
{
bool exited = _process.WaitForExit(milliseconds: 0);
if (exited)
{
return;
}
timeout -= delay;
await Task.Delay(delay).ConfigureAwait(false);
// Double delay up to 500ms.
delay = Math.Min(delay * 2, 500);
}
}
// Kill the child and do a blocking wait.
loggingService?.LogWarning(
BuildEventContext.Invalid,
null,
BuildEventFileInfo.Empty,
"KillingProcessWithPid",
_process.Id);
CommunicationsUtilities.Trace("Killing node with pid = {0}", _process.Id);
_process.KillTree(timeoutMilliseconds: 5000);
}
private bool ProcessHeaderBytesRead(int bytesRead)
{
if (bytesRead != _headerByte.Length)
{
CommunicationsUtilities.Trace(_nodeId, "COMMUNICATIONS ERROR (HRC) Node: {0} Process: {1} Bytes Read: {2} Expected: {3}", _nodeId, _process.Id, bytesRead, _headerByte.Length);
try
{
if (_process.HasExited)
{
CommunicationsUtilities.Trace(_nodeId, " Child Process {0} has exited.", _process.Id);
}
else
{
CommunicationsUtilities.Trace(_nodeId, " Child Process {0} is still running.", _process.Id);
}
}
catch (Exception e) when (!ExceptionHandling.IsCriticalException(e))
{
CommunicationsUtilities.Trace(_nodeId, "Unable to retrieve remote process information. {0}", e);
}
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return false;
}
return true;
}
#if FEATURE_APM
/// <summary>
/// Callback invoked by the completion of a read of a header byte on one of the named pipes.
/// </summary>
private void HeaderReadComplete(IAsyncResult result)
{
int bytesRead;
try
{
try
{
bytesRead = _pipeStream.EndRead(result);
}
// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}
if (!ProcessHeaderBytesRead(bytesRead))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in HeaderReadComplete: {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}
_currentPacketLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
MSBuildEventSource.Log.PacketReadSize(_currentPacketLength);
// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(_currentPacketLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();
_pipeStream.BeginRead(packetData, 0, _currentPacketLength, _bodyReadCompleteCallback, this);
}
#endif
private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketType packetType)
{
if (bytesRead != packetLength)
{
CommunicationsUtilities.Trace(_nodeId, "Bad packet read for packet {0} - Expected {1} bytes, got {2}", packetType, packetLength, bytesRead);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return false;
}
return true;
}
private bool ReadAndRoutePacket(NodePacketType packetType)
{
try
{
_readBufferMemoryStream.Position = 0;
_packetFactory.DeserializeAndRoutePacket(_nodeId, packetType, _readTranslator);
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in ReadAndRoutPacket: {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return false;
}
return true;
}
#if FEATURE_APM
/// <summary>
/// Method called when the body of a packet has been read.
/// </summary>
private void BodyReadComplete(IAsyncResult result)
{
NodePacketType packetType = (NodePacketType)_headerByte[0];
int bytesRead;
try
{
try
{
bytesRead = _pipeStream.EndRead(result);
}
// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}
if (!ProcessBodyBytesRead(bytesRead, _currentPacketLength, packetType))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in BodyReadComplete (Reading): {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}
// Read and route the packet.
if (!ReadAndRoutePacket(packetType))
{
return;
}
if (packetType != NodePacketType.NodeShutdown)
{
// Read the next packet.
BeginAsyncPacketRead();
}
else
{
Close();
}
}
#endif
}
}
}
|