|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Concurrent;
#if !FEATURE_THREAD_CULTURE
using System.Globalization;
#endif
using System.Threading;
using Microsoft.Build.Shared;
using BuildParameters = Microsoft.Build.Execution.BuildParameters;
#nullable disable
namespace Microsoft.Build.BackEnd
{
/// <summary>
/// This is an implementation of INodeEndpoint for in-proc nodes. This endpoint can use either
/// synchronous or asynchronous packet processing methods. When synchronous processing methods are
/// used, the SendData method will cause the OnDataReceived event on the receiving endpoint to be called
/// on the same thread, blocking until the handler returns. The asynchronous method more closely emulates
/// the way other kinds of endpoints work, as the recipient processes the packet on a different thread
/// than that from which the packet originated, but with the cost of the extra thread.
/// </summary>
internal class NodeEndpointInProc : INodeEndpoint
{
#region Private Data
/// <summary>
/// An object for the two inproc endpoints to synchronize on.
/// </summary>
private static Object s_locker = new Object();
/// <summary>
/// The current communication status of the node.
/// </summary>
private LinkStatus _status;
/// <summary>
/// The communications mode
/// </summary>
private EndpointMode _mode;
/// <summary>
/// The peer endpoint
/// </summary>
private NodeEndpointInProc _peerEndpoint;
/// <summary>
/// The build component host
/// </summary>
private IBuildComponentHost _componentHost;
/// <summary>
/// The packet factory used to route packets.
/// </summary>
private INodePacketFactory _packetFactory;
// The following private data fields are used only when the endpoint is in ASYNCHRONOUS mode.
/// <summary>
/// Object used as a lock source for the async data
/// </summary>
private object _asyncDataMonitor;
/// <summary>
/// Set when a packet is available in the packet queue
/// </summary>
private AutoResetEvent _packetAvailable;
/// <summary>
/// Set when the asynchronous packet pump should terminate
/// </summary>
private AutoResetEvent _terminatePacketPump;
/// <summary>
/// The thread which runs the asynchronous packet pump
/// </summary>
private Thread _packetPump;
/// <summary>
/// Set to true if our peer is connected to us.
/// </summary>
private bool _peerConnected;
/// <summary>
/// The asynchronous packet queue.
/// </summary>
/// <remarks>
/// Operations on this queue must be synchronized since it is accessible by multiple threads.
/// Use a lock on the packetQueue itself.
/// </remarks>
private ConcurrentQueue<INodePacket> _packetQueue;
#endregion
#region Constructors and Factories
/// <summary>
/// Instantiates a Node and initializes it to unconnected.
/// </summary>
/// <param name="commMode">The communications mode for this endpoint.</param>
/// <param name="host">The component host.</param>
private NodeEndpointInProc(EndpointMode commMode, IBuildComponentHost host)
{
ErrorUtilities.VerifyThrowArgumentNull(host);
_status = LinkStatus.Inactive;
_mode = commMode;
_componentHost = host;
if (commMode == EndpointMode.Asynchronous)
{
_asyncDataMonitor = new object();
}
}
#endregion
#region INodeEndpoint Events
/// <summary>
/// Raised when the link status has changed.
/// </summary>
public event LinkStatusChangedDelegate OnLinkStatusChanged;
#endregion
#region Public Types and Enums
/// <summary>
/// Represents the style of communications used by the in-proc endpoint.
/// </summary>
internal enum EndpointMode
{
/// <summary>
/// The DataReceived event is raised on the same thread as that which called SendData.
/// </summary>
Synchronous,
/// <summary>
/// The DataReceived event is raised on a separate thread from that which called SendData
/// </summary>
Asynchronous
}
#endregion
#region INodeEndpoint Properties
/// <summary>
/// Returns the link status of this node.
/// </summary>
public LinkStatus LinkStatus
{
get { return _status; }
}
#endregion
#region INodeEndpoint Methods
/// <summary>
/// Causes this endpoint to wait for the remote endpoint to connect
/// </summary>
/// <param name="factory">Unused</param>
public void Listen(INodePacketFactory factory)
{
ErrorUtilities.VerifyThrowInternalNull(factory);
_packetFactory = factory;
// Initialize our thread in async mode so we are ready when the Node-side endpoint "connects".
if (_mode == EndpointMode.Asynchronous)
{
InitializeAsyncPacketThread();
}
_peerEndpoint.SetPeerNodeConnected();
}
/// <summary>
/// Causes this node to connect to the matched endpoint.
/// </summary>
/// <param name="factory">Unused</param>
public void Connect(INodePacketFactory factory)
{
ErrorUtilities.VerifyThrowInternalNull(factory);
_packetFactory = factory;
// Set up asynchronous packet pump, if necessary.
if (_mode == EndpointMode.Asynchronous)
{
InitializeAsyncPacketThread();
}
// Notify the Build Manager-side endpoint that the connection is now active.
_peerEndpoint.SetPeerNodeConnected();
}
/// <summary>
/// Shuts down the link
/// </summary>
public void Disconnect()
{
InternalDisconnect();
// Notify the remote endpoint that the link is dead
_peerEndpoint.SetPeerNodeDisconnected();
}
/// <summary>
/// Sends data to the peer endpoint.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
ErrorUtilities.VerifyThrow(_status == LinkStatus.Active, "Cannot send when link status is not active. Current status {0}", _status);
if (_mode == EndpointMode.Synchronous)
{
_peerEndpoint._packetFactory.RoutePacket(0, packet);
}
else
{
EnqueuePacket(packet);
}
}
public void ClientWillDisconnect()
{
// We do not need to do anything here for InProc node.
}
#endregion
#region Internal Methods
/// <summary>
/// This method is used to create a matched pair of endpoints used by the Node Provider and
/// the Node. The inputs and outputs for each node are automatically configured.
/// </summary>
/// <param name="mode">The communications mode for the endpoints.</param>
/// <param name="host">The component host.</param>
/// <returns>A matched pair of endpoints.</returns>
internal static EndpointPair CreateInProcEndpoints(EndpointMode mode, IBuildComponentHost host)
{
NodeEndpointInProc node = new NodeEndpointInProc(mode, host);
NodeEndpointInProc manager = new NodeEndpointInProc(mode, host);
// NOTE: This creates a circular reference which must be explicitly broken before these
// objects can be reclaimed by the garbage collector.
node._peerEndpoint = manager;
manager._peerEndpoint = node;
return new EndpointPair(node, manager);
}
#endregion
#region Private Event Methods
/// <summary>
/// Invokes the OnLinkStatusChanged event in a thread-safe manner.
/// </summary>
/// <param name="newStatus">The new status of the endpoint link.</param>
private void RaiseLinkStatusChanged(LinkStatus newStatus)
{
if (OnLinkStatusChanged != null)
{
LinkStatusChangedDelegate linkStatusDelegate = OnLinkStatusChanged;
linkStatusDelegate(this, newStatus);
}
}
#endregion
#region Private Methods
/// <summary>
/// This method is called by the other endpoint when it is ready to establish the connection.
/// </summary>
private void SetPeerNodeConnected()
{
lock (s_locker)
{
_peerConnected = true;
if (_peerEndpoint._peerConnected)
{
ChangeLinkStatus(LinkStatus.Active);
_peerEndpoint.ChangeLinkStatus(LinkStatus.Active);
}
}
}
/// <summary>
/// This method is called by either side to notify this endpoint that the link is inactive.
/// </summary>
private void SetPeerNodeDisconnected()
{
_peerConnected = false;
InternalDisconnect();
}
/// <summary>
/// This does the actual work of changing the status and shutting down any threads we may have for
/// disconnection.
/// </summary>
private void InternalDisconnect()
{
ErrorUtilities.VerifyThrow(_status == LinkStatus.Active, "Endpoint is not connected. Current status {0}", _status);
ChangeLinkStatus(LinkStatus.Inactive);
// Terminate our thread if we were in async mode
if (_mode == EndpointMode.Asynchronous)
{
TerminateAsyncPacketThread();
}
}
/// <summary>
/// Updates the current link status if it has changed and notifies any registered delegates.
/// </summary>
/// <param name="newStatus">The status the node should now be in.</param>
private void ChangeLinkStatus(LinkStatus newStatus)
{
ErrorUtilities.VerifyThrow(_status != newStatus, "Attempting to change status to existing status {0}.", _status);
_status = newStatus;
RaiseLinkStatusChanged(_status);
}
#region Asynchronous Mode Methods
/// <summary>
/// Adds a packet to the packet queue when asynchronous mode is enabled.
/// </summary>
/// <param name="packet">The packet to be transmitted.</param>
private void EnqueuePacket(INodePacket packet)
{
ErrorUtilities.VerifyThrowArgumentNull(packet);
ErrorUtilities.VerifyThrow(_mode == EndpointMode.Asynchronous, "EndPoint mode is synchronous, should be asynchronous");
ErrorUtilities.VerifyThrow(_packetQueue != null, "packetQueue is null");
ErrorUtilities.VerifyThrow(_packetAvailable != null, "packetAvailable is null");
_packetQueue.Enqueue(packet);
_packetAvailable.Set();
}
/// <summary>
/// Initializes the packet pump thread and the supporting events as well as the packet queue.
/// </summary>
private void InitializeAsyncPacketThread()
{
lock (_asyncDataMonitor)
{
ErrorUtilities.VerifyThrow(_packetPump == null, "packetPump != null");
ErrorUtilities.VerifyThrow(_packetAvailable == null, "packetAvailable != null");
ErrorUtilities.VerifyThrow(_terminatePacketPump == null, "terminatePacketPump != null");
ErrorUtilities.VerifyThrow(_packetQueue == null, "packetQueue != null");
#if FEATURE_THREAD_CULTURE
_packetPump = new Thread(PacketPumpProc);
#else
// In .NET Core, we need to set the current culture from inside the new thread
CultureInfo culture = _componentHost.BuildParameters.Culture;
CultureInfo uiCulture = _componentHost.BuildParameters.UICulture;
_packetPump = new Thread(() =>
{
CultureInfo.CurrentCulture = culture;
CultureInfo.CurrentUICulture = uiCulture;
PacketPumpProc();
});
#endif
_packetPump.Name = "InProc Endpoint Packet Pump";
_packetAvailable = new AutoResetEvent(false);
_terminatePacketPump = new AutoResetEvent(false);
_packetQueue = new ConcurrentQueue<INodePacket>();
#if FEATURE_THREAD_CULTURE
_packetPump.CurrentCulture = _componentHost.BuildParameters.Culture;
_packetPump.CurrentUICulture = _componentHost.BuildParameters.UICulture;
#endif
_packetPump.Start();
}
}
/// <summary>
/// Shuts down the packet pump thread and cleans up associated data.
/// </summary>
private void TerminateAsyncPacketThread()
{
lock (_asyncDataMonitor)
{
ErrorUtilities.VerifyThrow(_packetPump != null, "packetPump == null");
ErrorUtilities.VerifyThrow(_packetAvailable != null, "packetAvailable == null");
ErrorUtilities.VerifyThrow(_terminatePacketPump != null, "terminatePacketPump == null");
ErrorUtilities.VerifyThrow(_packetQueue != null, "packetQueue == null");
_terminatePacketPump.Set();
if (!_packetPump.Join((int)new TimeSpan(0, 0, BuildParameters.EndpointShutdownTimeout).TotalMilliseconds))
{
#if FEATURE_THREAD_ABORT
// We timed out. Kill it.
_packetPump.Abort();
#endif
}
_packetPump = null;
_packetAvailable.Dispose();
_packetAvailable = null;
_terminatePacketPump.Dispose();
_terminatePacketPump = null;
_packetQueue = null;
}
}
/// <summary>
/// This method handles the asynchronous message pump. It waits for messages to show up on the queue
/// and calls FireDataAvailable for each such packet. It will terminate when the terminate event is
/// set.
/// </summary>
private void PacketPumpProc()
{
try
{
// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the endpoint and it never gets an opportunity to shutdown.
WaitHandle[] handles = [_terminatePacketPump, _packetAvailable];
bool exitLoop = false;
do
{
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
{
case 0:
exitLoop = true;
break;
case 1:
{
INodePacket packet;
while (_packetQueue.TryDequeue(out packet))
{
_peerEndpoint._packetFactory.RoutePacket(0, packet);
}
}
break;
default:
ErrorUtilities.ThrowInternalError("waitId {0} out of range.", waitId);
break;
}
}
while (!exitLoop);
}
catch (Exception e)
{
// Dump all engine exceptions to a temp file
// so that we have something to go on in the
// event of a failure
ExceptionHandling.DumpExceptionToFile(e);
throw;
}
}
#endregion
#endregion
#region Structs
/// <summary>
/// Used to return a matched pair of endpoints for in-proc nodes to use with the Build Manager.
/// </summary>
internal struct EndpointPair
{
/// <summary>
/// The endpoint destined for use by a node.
/// </summary>
internal readonly NodeEndpointInProc NodeEndpoint;
/// <summary>
/// The endpoint destined for use by the Build Manager
/// </summary>
internal readonly NodeEndpointInProc ManagerEndpoint;
/// <summary>
/// Creates an endpoint pair
/// </summary>
/// <param name="node">The node-side endpoint.</param>
/// <param name="manager">The manager-side endpoint.</param>
internal EndpointPair(NodeEndpointInProc node, NodeEndpointInProc manager)
{
NodeEndpoint = node;
ManagerEndpoint = manager;
}
}
#endregion
}
}
|