File: CoordinatorServer.cs
Web Access
Project: src\msbuild\src\MSBuild.Coordinator\MSBuild.Coordinator.csproj (MSBuild.Coordinator)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Diagnostics;
using System.IO.Pipes;
using System.Text;
using Microsoft.Build.Framework.Coordinator;
using Microsoft.Build.Internal;

namespace Microsoft.Build.Coordinator;

/// <summary>
///  The coordinator server that listens for MSBuild client connections on a named pipe,
///  manages node grants, and monitors build health via heartbeats.
/// </summary>
/// <param name="settings">Configuration for the coordinator (pipe name, budget, timeouts, etc.).</param>
/// <param name="output">Optional debug trace output. Defaults to file-based trace logging gated on MSBUILDDEBUGCOMM.</param>
internal sealed partial class CoordinatorServer(CoordinatorSettings settings, ICoordinatorDebugOutput? output = null) : IDisposable
{
    private readonly CoordinatorSettings _settings = settings;
    private readonly NodeBudgetManager _budgetManager = new(settings.TotalNodeBudget);
    private readonly string _pipeName = settings.PipeName;
    private readonly int _heartbeatIntervalMs = settings.HeartbeatIntervalMs;
    private readonly int _shutdownTimeoutMs = settings.ShutdownTimeoutMs;
    private readonly Dictionary<Guid, ClientConnection> _connectionsById = [];
    private readonly ReaderWriterLockSlim _connectionLock = new();
    private readonly CancellationTokenSource _cts = new();
    private readonly ICoordinatorDebugOutput _output = output ?? DefaultDebugOutput.Instance;
    private Timer? _heartbeatMonitor;
    private Timer? _shutdownTimer;

    public void Dispose()
    {
        _cts.Cancel();
        _heartbeatMonitor?.Dispose();
        _shutdownTimer?.Dispose();
        _cts.Dispose();
        _connectionLock.Dispose();
    }

    /// <summary>
    ///  Runs the coordinator server until cancellation is requested or the auto-shutdown
    ///  timeout elapses with no active or waiting builds.
    /// </summary>
    /// <param name="cancellationToken">Token to signal the server should stop accepting connections.</param>
    public async Task RunAsync(CancellationToken cancellationToken = default)
    {
        using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
        CancellationToken token = linked.Token;

        // Start heartbeat monitoring.
        _heartbeatMonitor = new Timer(
            CheckHeartbeats,
            state: null,
            dueTime: _heartbeatIntervalMs,
            period: _heartbeatIntervalMs);

        // Start auto-shutdown timer.
        ResetShutdownTimer();

        _output.WriteLine($"CoordinatorServer: Accept loop started on pipe '{_pipeName}' (budget={_settings.TotalNodeBudget})");

        ConcurrentDictionary<Task, byte> clientTasks = [];

        try
        {
            while (!token.IsCancellationRequested)
            {
                NamedPipeServerStream? pipeStream = await WaitForClientAsync(token);

                if (pipeStream is null)
                {
                    break;
                }

                // Dispatch client handling to the thread pool so the accept loop immediately
                // creates the next pipe instance. This is necessary because HandleClientAsync
                // performs a synchronous read (ClientMessage.Read) before its first await,
                // which would block the accept loop if run inline.
                //
                // CancellationToken.None is intentional: we don't want Task.Run to cancel
                // before HandleClientAsync starts, which would orphan the pipe stream.
                // HandleClientAsync receives the cancellation token for its own loop.
                Task clientTask = Task.Run(() => HandleClientAsync(pipeStream, token), CancellationToken.None);
                clientTasks.TryAdd(clientTask, 0);

                // Remove task from tracking when it completes to prevent unbounded growth.
                _ = clientTask.ContinueWith(_ => clientTasks.TryRemove(clientTask, out byte _), TaskScheduler.Default);
            }
        }
        finally
        {
            _output.WriteLine("CoordinatorServer: Accept loop exiting");
            _heartbeatMonitor?.Dispose();
            _shutdownTimer?.Dispose();

            // Wait for all remaining client tasks to complete before exiting.
            // This ensures logging (which may reference the test context) completes cleanly.
            ICollection<Task> remainingTasks = clientTasks.Keys;

            if (remainingTasks.Count > 0)
            {
                try
                {
                    await Task.WhenAll(remainingTasks);
                }
                catch
                {
                    // Swallow exceptions from client tasks; they're already logged.
                }
            }
        }
    }

    /// <summary>
    ///  Waits for a client to connect to the named pipe.
    /// </summary>
    /// <param name="token">Cancellation token to abort the wait.</param>
    /// <returns>
    ///  The connected pipe stream, or <see langword="null"/> if the wait was cancelled.
    /// </returns>
    private async Task<NamedPipeServerStream?> WaitForClientAsync(CancellationToken token)
    {
        NamedPipeServerStream pipeStream = new(
            _pipeName,
            PipeDirection.InOut,
            NamedPipeServerStream.MaxAllowedServerInstances,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous);

        try
        {
            await pipeStream.WaitForConnectionAsync(token);
            return pipeStream;
        }
        catch (OperationCanceledException)
        {
            pipeStream.Dispose();
            return null;
        }
    }

    /// <summary>
    ///  Handles a single client connection for its entire lifetime.
    /// </summary>
    /// <param name="pipeStream">The connected pipe stream for this client.</param>
    /// <param name="token">Cancellation token to signal the client loop should exit.</param>
    private async Task HandleClientAsync(NamedPipeServerStream pipeStream, CancellationToken token)
    {
        ClientConnection? connection = null;

        try
        {
            using BinaryReader initialReader = new(pipeStream, Encoding.UTF8, leaveOpen: true);
            using BinaryWriter initialWriter = new(pipeStream, Encoding.UTF8, leaveOpen: true);

            // The first message must be a Handshake.
            ClientMessage firstMessage = initialReader.ReadClientMessage();

            if (firstMessage is not ClientHandshakeMessage handshake)
            {
                _output.WriteLine($"CoordinatorServer: Rejected client — first message was {firstMessage.GetType().Name}");
                initialWriter.Write(new ErrorMessage("First message must be Handshake"));
                pipeStream.Dispose();
                return;
            }

            _output.WriteLine($"CoordinatorServer: Handshake received (ConnectionId {handshake.ConnectionId}, PID {handshake.ProcessId}, Capabilities: [{string.Join(", ", handshake.Capabilities)}])");

            // Respond with server capabilities.
            initialWriter.Write(new ServerHandshakeMessage([]));

            // The second message must be RequestNodes.
            ClientMessage secondMessage = initialReader.ReadClientMessage();

            if (secondMessage is not RequestNodesMessage request)
            {
                _output.WriteLine($"CoordinatorServer: Rejected client — second message was {secondMessage.GetType().Name}");
                initialWriter.Write(new ErrorMessage("Second message must be RequestNodes"));
                pipeStream.Dispose();
                return;
            }

            if (handshake.ProcessId <= 0 || request.RequestedNodes <= 0)
            {
                _output.WriteLine($"CoordinatorServer: Rejected client — invalid request (PID={handshake.ProcessId}, RequestedNodes={request.RequestedNodes})");
                initialWriter.Write(new ErrorMessage("Invalid request: ProcessId and RequestedNodes must be > 0"));
                pipeStream.Dispose();
                return;
            }

            _output.WriteLine($"CoordinatorServer: Client connected (PID {handshake.ProcessId}, ConnectionId {handshake.ConnectionId}, requested {request.RequestedNodes} nodes)");

            BuildGrant grant = new(handshake.ConnectionId, handshake.ProcessId, request.RequestedNodes);
            connection = new ClientConnection(handshake.ConnectionId, handshake.ProcessId, handshake.Capabilities, grant, pipeStream);

            using (_connectionLock.EnterDisposableWriteLock())
            {
                _connectionsById[handshake.ConnectionId] = connection;
            }

            // Try to grant nodes.
            int grantedNodes = _budgetManager.TryGrant(grant);

            if (grantedNodes > 0)
            {
                _output.WriteLine($"CoordinatorServer: Granted {grantedNodes} nodes to PID {handshake.ProcessId}");
                connection.Writer.Write(new NodeGrantMessage(grantedNodes));
            }
            else
            {
                _output.WriteLine($"CoordinatorServer: PID {handshake.ProcessId} queued (no nodes available)");
                connection.Writer.Write(WaitMessage.Instance);

                // The grant will be fulfilled later when resources free up.
            }

            ResetShutdownTimer();

            // Process subsequent messages (heartbeats and release).
            while (!token.IsCancellationRequested && pipeStream.IsConnected)
            {
                ClientMessage message;

                try
                {
                    message = await Task.Run(() => connection.Reader.ReadClientMessage(), token);
                }
                catch (EndOfStreamException)
                {
                    _output.WriteLine($"CoordinatorServer: PID {handshake.ProcessId} disconnected (end of stream)");

                    // Client disconnected.
                    break;
                }
                catch (IOException)
                {
                    _output.WriteLine($"CoordinatorServer: PID {handshake.ProcessId} disconnected (pipe broken)");

                    // Pipe broken.
                    break;
                }

                switch (message)
                {
                    case HeartbeatMessage:
                        grant.LastHeartbeat = DateTime.UtcNow;
                        break;

                    case ReleaseNodesMessage:
                        _output.WriteLine($"CoordinatorServer: PID {handshake.ProcessId} released grant");
                        ReleaseConnection(connection);
                        connection.Dispose();
                        connection = null;
                        return;
                }
            }
        }
        catch (Exception ex) when (!Debugger.IsAttached)
        {
            _output.WriteLine($"CoordinatorServer: Exception handling client: {ex.Message}");

            // Swallow exceptions from individual client handling.
        }
        finally
        {
            // If we get here without an explicit release, treat it as a crash/disconnect.
            if (connection is not null)
            {
                ReleaseConnection(connection);
                connection.Dispose();
            }
            else
            {
                pipeStream.Dispose();
            }
        }
    }

    /// <summary>
    ///  Releases a connection's grant and notifies any builds that were waiting for resources.
    /// </summary>
    /// <param name="connection">The client connection whose grant is being released.</param>
    private void ReleaseConnection(ClientConnection connection)
    {
        using (_connectionLock.EnterDisposableWriteLock())
        {
            // Only remove if this connection is still current for the connection ID.
            if (_connectionsById.TryGetValue(connection.ConnectionId, out var current) &&
                current == connection)
            {
                _connectionsById.Remove(connection.ConnectionId);
            }
        }

        ImmutableArray<BuildGrant> newlyGranted = _budgetManager.Release(connection.Grant);

        if (newlyGranted.Length > 0)
        {
            _output.WriteLine($"CoordinatorServer: Draining wait queue, {newlyGranted.Length} build(s) to notify");
        }

        // Notify newly granted builds outside the locks.
        foreach (BuildGrant grant in newlyGranted)
        {
            bool found;
            ClientConnection? waitingConnection;
            using (_connectionLock.EnterDisposableReadLock())
            {
                found = _connectionsById.TryGetValue(grant.ConnectionId, out waitingConnection);
            }

            if (found && waitingConnection is not null)
            {
                try
                {
                    _output.WriteLine($"CoordinatorServer: Granting {grant.GrantedNodes} deferred nodes to PID {grant.ProcessId}");
                    waitingConnection.Writer.Write(new NodeGrantMessage(grant.GrantedNodes));
                }
                catch (IOException)
                {
                    _output.WriteLine($"CoordinatorServer: PID {grant.ProcessId} disconnected while waiting");

                    // Client disconnected while waiting. Release their grant too.
                    ReleaseConnection(waitingConnection);
                }
            }
        }

        ResetShutdownTimer();
    }

    /// <summary>
    ///  Periodically checks for builds that have missed heartbeats and reclaims their grants.
    /// </summary>
    /// <param name="state">Timer callback state (unused).</param>
    private void CheckHeartbeats(object? state)
    {
        DateTime threshold = DateTime.UtcNow - TimeSpan.FromMilliseconds(_settings.HeartbeatTimeoutMs);

        List<ClientConnection> connectionsToCheck;

        using (_connectionLock.EnterDisposableReadLock())
        {
            connectionsToCheck = [.. _connectionsById.Values];
        }

        foreach (ClientConnection connection in connectionsToCheck)
        {
            if (connection.Grant.LastHeartbeat >= threshold)
            {
                continue;
            }

            // Check if the process is still alive before reclaiming.
            if (IsProcessAlive(connection.ProcessId))
            {
                continue;
            }

            _output.WriteLine($"CoordinatorServer: Reclaiming grant from dead PID {connection.ProcessId}");

            // ReleaseConnection will acquire its own write lock.
            ReleaseConnection(connection);
        }
    }

    /// <summary>
    ///  Checks whether a process with the given ID is still running.
    /// </summary>
    /// <param name="processId">The OS process ID to check.</param>
    /// <returns>
    ///  <see langword="true"/> if the process exists and has not exited; otherwise <see langword="false"/>.
    /// </returns>
    private static bool IsProcessAlive(int processId)
    {
        try
        {
            using Process process = Process.GetProcessById(processId);
            return !process.HasExited;
        }
        catch (ArgumentException)
        {
            // Process doesn't exist.
            return false;
        }
    }

    /// <summary>
    ///  Resets the auto-shutdown timer. If no builds are active or waiting when the timer
    ///  fires, the coordinator shuts down.
    /// </summary>
    private void ResetShutdownTimer()
    {
        var newTimer = new Timer(
            _ =>
            {
                if (_budgetManager.IsIdle)
                {
                    _output.WriteLine("CoordinatorServer: Auto-shutdown (no active or waiting builds)");
                    _cts.Cancel();
                }
            },
            state: null,
            dueTime: _shutdownTimeoutMs,
            period: Timeout.Infinite);

        Interlocked.Exchange(ref _shutdownTimer, newTimer)?.Dispose();
    }
}