File: Projects\AppHostRpcClient.cs
Web Access
Project: src\src\Aspire.Cli\Aspire.Cli.Tool.csproj (aspire)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.IO.Pipes;
using System.Net.Sockets;
using Aspire.Cli.Backchannel;
using Aspire.Hosting.Ats;
using StreamJsonRpc;
 
namespace Aspire.Cli.Projects;
 
/// <summary>
/// Implementation of <see cref="IAppHostRpcClient"/> using JSON-RPC over sockets/pipes.
/// </summary>
internal sealed class AppHostRpcClient : IAppHostRpcClient
{
    private readonly Stream _stream;
    private readonly JsonRpc _jsonRpc;
 
    private AppHostRpcClient(Stream stream, JsonRpc jsonRpc)
    {
        _stream = stream;
        _jsonRpc = jsonRpc;
    }
 
    /// <summary>
    /// Creates and connects an RPC client to the specified socket path.
    /// </summary>
    public static async Task<AppHostRpcClient> ConnectAsync(string socketPath, CancellationToken cancellationToken)
    {
        var stream = await ConnectToServerAsync(socketPath, cancellationToken);
 
        var formatter = BackchannelJsonSerializerContext.CreateRpcMessageFormatter();
        var handler = new HeaderDelimitedMessageHandler(stream, stream, formatter);
        var jsonRpc = new JsonRpc(handler);
        jsonRpc.StartListening();
 
        return new AppHostRpcClient(stream, jsonRpc);
    }
 
    // ═══════════════════════════════════════════════════════════════
    // TYPED WRAPPERS
    // ═══════════════════════════════════════════════════════════════
 
    /// <inheritdoc />
    public Task<RuntimeSpec> GetRuntimeSpecAsync(string languageId, CancellationToken cancellationToken)
        => _jsonRpc.InvokeWithCancellationAsync<RuntimeSpec>("getRuntimeSpec", [languageId], cancellationToken);
 
    /// <inheritdoc />
    public Task<Dictionary<string, string>> ScaffoldAppHostAsync(
        string languageId, string targetPath, string? projectName, CancellationToken cancellationToken)
        => _jsonRpc.InvokeWithCancellationAsync<Dictionary<string, string>>(
            "scaffoldAppHost", [languageId, targetPath, projectName], cancellationToken);
 
    /// <inheritdoc />
    public Task<Dictionary<string, string>> GenerateCodeAsync(string languageId, CancellationToken cancellationToken)
        => _jsonRpc.InvokeWithCancellationAsync<Dictionary<string, string>>(
            "generateCode", [languageId], cancellationToken);
 
    // ═══════════════════════════════════════════════════════════════
    // GENERIC INVOKE
    // ═══════════════════════════════════════════════════════════════
 
    /// <inheritdoc />
    public Task<T> InvokeAsync<T>(string methodName, object?[] parameters, CancellationToken cancellationToken)
        => _jsonRpc.InvokeWithCancellationAsync<T>(methodName, parameters, cancellationToken);
 
    /// <inheritdoc />
    public Task InvokeAsync(string methodName, object?[] parameters, CancellationToken cancellationToken)
        => _jsonRpc.InvokeWithCancellationAsync(methodName, parameters, cancellationToken);
 
    /// <inheritdoc />
    public async ValueTask DisposeAsync()
    {
        _jsonRpc.Dispose();
        await _stream.DisposeAsync();
    }
 
    // ═══════════════════════════════════════════════════════════════
    // CONNECTION LOGIC
    // ═══════════════════════════════════════════════════════════════
 
    /// <summary>
    /// Connects to the RPC server using platform-appropriate transport.
    /// </summary>
    private static async Task<Stream> ConnectToServerAsync(string socketPath, CancellationToken cancellationToken)
    {
        var connected = false;
        var startTime = DateTimeOffset.UtcNow;
        const int ConnectionTimeoutSeconds = 30;
 
        if (OperatingSystem.IsWindows())
        {
            var pipeClient = new NamedPipeClientStream(".", socketPath, PipeDirection.InOut, PipeOptions.Asynchronous);
 
            while (!connected && (DateTimeOffset.UtcNow - startTime) < TimeSpan.FromSeconds(ConnectionTimeoutSeconds))
            {
                try
                {
                    await pipeClient.ConnectAsync(cancellationToken).ConfigureAwait(false);
                    connected = true;
                }
                catch (TimeoutException)
                {
                    await Task.Delay(100, cancellationToken).ConfigureAwait(false);
                }
                catch (IOException)
                {
                    await Task.Delay(100, cancellationToken).ConfigureAwait(false);
                }
            }
 
            if (!connected)
            {
                pipeClient.Dispose();
                throw new InvalidOperationException($"Failed to connect to RPC server at {socketPath}");
            }
 
            return pipeClient;
        }
        else
        {
            var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
            var endpoint = new UnixDomainSocketEndPoint(socketPath);
 
            while (!connected && (DateTimeOffset.UtcNow - startTime) < TimeSpan.FromSeconds(ConnectionTimeoutSeconds))
            {
                try
                {
                    await socket.ConnectAsync(endpoint, cancellationToken).ConfigureAwait(false);
                    connected = true;
                }
                catch (SocketException)
                {
                    await Task.Delay(100, cancellationToken).ConfigureAwait(false);
                }
            }
 
            if (!connected)
            {
                socket.Dispose();
                throw new InvalidOperationException($"Failed to connect to RPC server at {socketPath}");
            }
 
            return new NetworkStream(socket, ownsSocket: true);
        }
    }
}
 
/// <summary>
/// Factory for creating <see cref="IAppHostRpcClient"/> instances.
/// </summary>
internal sealed class AppHostRpcClientFactory : IAppHostRpcClientFactory
{
    /// <inheritdoc />
    public async Task<IAppHostRpcClient> ConnectAsync(string socketPath, CancellationToken cancellationToken)
    {
        return await AppHostRpcClient.ConnectAsync(socketPath, cancellationToken);
    }
}