File: ResourceService\DashboardClient.cs
Web Access
Project: src\src\Aspire.Dashboard\Aspire.Dashboard.csproj (Aspire.Dashboard)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Immutable;
using System.Diagnostics;
using System.Net.Security;
using System.Runtime.CompilerServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Channels;
using Aspire.Dashboard.Configuration;
using Aspire.Dashboard.Utils;
using Aspire.Hosting;
using Aspire.ResourceService.Proto.V1;
using Grpc.Core;
using Grpc.Net.Client;
using Grpc.Net.Client.Configuration;
using Microsoft.Extensions.Options;
 
namespace Aspire.Dashboard.Model;
 
/// <summary>
/// Implements gRPC client that communicates with a resource server, populating data for the dashboard.
/// </summary>
/// <remarks>
/// <para>
/// An instance of this type is created per service call, so this class should not hold onto any state
/// expected to live longer than a single RPC request. In the case of streaming requests, the instance
/// lives until the stream is closed.
/// </para>
/// <para>
/// If the <c>DOTNET_RESOURCE_SERVICE_ENDPOINT_URL</c> environment variable is not specified, then there's
/// no known endpoint to connect to, and this dashboard client will be disabled. Calls to
/// <see cref="IDashboardClient.SubscribeResourcesAsync"/> and <see cref="IDashboardClient.SubscribeConsoleLogs"/>
/// will throw if <see cref="IDashboardClientStatus.IsEnabled"/> is <see langword="false"/>. Callers should
/// check this property first, before calling these methods.
/// </para>
/// </remarks>
internal sealed class DashboardClient : IDashboardClient
{
    private const string ApiKeyHeaderName = "x-resource-service-api-key";
 
    private readonly Dictionary<string, ResourceViewModel> _resourceByName = new(StringComparers.ResourceName);
    private readonly CancellationTokenSource _cts = new();
    private readonly CancellationToken _clientCancellationToken;
    private readonly TaskCompletionSource _whenConnectedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
    private readonly TaskCompletionSource _initialDataReceivedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
    private readonly object _lock = new();
 
    private readonly ILoggerFactory _loggerFactory;
    private readonly IDashboardClientStatus _dashboardClientStatus;
    private readonly BrowserTimeProvider _timeProvider;
    private readonly IKnownPropertyLookup _knownPropertyLookup;
    private readonly DashboardOptions _dashboardOptions;
    private readonly ILogger<DashboardClient> _logger;
 
    private ImmutableHashSet<Channel<IReadOnlyList<ResourceViewModelChange>>> _outgoingChannels = [];
    private string? _applicationName;
 
    private const int StateDisabled = -1;
    private const int StateNone = 0;
    private const int StateInitialized = 1;
    private const int StateDisposed = 2;
    private int _state = StateNone;
 
    private readonly GrpcChannel? _channel;
    private readonly DashboardService.DashboardServiceClient? _client;
    private readonly Metadata _headers = [];
 
    private Task? _connection;
 
    public DashboardClient(
        ILoggerFactory loggerFactory,
        IConfiguration configuration,
        IOptions<DashboardOptions> dashboardOptions,
        IDashboardClientStatus dashboardClientStatus,
        BrowserTimeProvider timeProvider,
        IKnownPropertyLookup knownPropertyLookup,
        Action<SocketsHttpHandler>? configureHttpHandler = null)
    {
        _loggerFactory = loggerFactory;
        _dashboardClientStatus = dashboardClientStatus;
        _timeProvider = timeProvider;
        _knownPropertyLookup = knownPropertyLookup;
        _dashboardOptions = dashboardOptions.Value;
 
        // Take a copy of the token and always use it to avoid race between disposal of CTS and usage of token.
        _clientCancellationToken = _cts.Token;
 
        _logger = loggerFactory.CreateLogger<DashboardClient>();
 
        if (!_dashboardClientStatus.IsEnabled)
        {
            _state = StateDisabled;
            _logger.LogDebug($"{DashboardConfigNames.ResourceServiceUrlName.ConfigKey} is not specified. Dashboard client services are unavailable.");
            _cts.Cancel();
            _whenConnectedTcs.TrySetCanceled();
            return;
        }
 
        var address = _dashboardOptions.ResourceServiceClient.GetUri()!;
        _logger.LogDebug("Dashboard configured to connect to: {Address}", address);
 
        // Create the gRPC channel. This channel performs automatic reconnects.
        // We will dispose it when we are disposed.
        _channel = CreateChannel();
 
        if (_dashboardOptions.ResourceServiceClient.AuthMode is ResourceClientAuthMode.ApiKey)
        {
            // We're using an API key for auth, so set it in the headers we pass on each call.
            _headers.Add(ApiKeyHeaderName, _dashboardOptions.ResourceServiceClient.ApiKey!);
        }
 
        _client = new DashboardService.DashboardServiceClient(_channel);
 
        GrpcChannel CreateChannel()
        {
            var httpHandler = new SocketsHttpHandler
            {
                EnableMultipleHttp2Connections = true,
                KeepAlivePingDelay = TimeSpan.FromSeconds(20),
                KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
                KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
            };
 
            var authMode = _dashboardOptions.ResourceServiceClient.AuthMode;
 
            if (authMode == ResourceClientAuthMode.Certificate)
            {
                // Auth hasn't been suppressed, so configure it.
                var certificates = _dashboardOptions.ResourceServiceClient.ClientCertificate.Source switch
                {
                    DashboardClientCertificateSource.File => GetFileCertificate(),
                    DashboardClientCertificateSource.KeyStore => GetKeyStoreCertificate(),
                    _ => throw new InvalidOperationException("Unable to load ResourceServiceClient client certificate.")
                };
 
                httpHandler.SslOptions = new SslClientAuthenticationOptions
                {
                    ClientCertificates = certificates
                };
 
                configuration.Bind("Dashboard:ResourceServiceClient:Ssl", httpHandler.SslOptions);
            }
 
            // https://learn.microsoft.com/aspnet/core/grpc/retries
 
            var methodConfig = new MethodConfig
            {
                Names = { MethodName.Default },
                RetryPolicy = new RetryPolicy
                {
                    MaxAttempts = 5,
                    InitialBackoff = TimeSpan.FromSeconds(1),
                    MaxBackoff = TimeSpan.FromSeconds(5),
                    BackoffMultiplier = 1.5,
                    RetryableStatusCodes = { StatusCode.Unavailable }
                }
            };
 
            configureHttpHandler?.Invoke(httpHandler);
 
            // https://learn.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
 
            return GrpcChannel.ForAddress(
                address,
                channelOptions: new()
                {
                    HttpHandler = httpHandler,
                    ServiceConfig = new() { MethodConfigs = { methodConfig } },
                    LoggerFactory = _loggerFactory,
                    ThrowOperationCanceledOnCancellation = true
                });
 
            X509CertificateCollection GetFileCertificate()
            {
                Debug.Assert(
                    _dashboardOptions.ResourceServiceClient.ClientCertificate.FilePath != null,
                    "FilePath is validated as not null when configuration is loaded.");
 
                var filePath = _dashboardOptions.ResourceServiceClient.ClientCertificate.FilePath;
                var password = _dashboardOptions.ResourceServiceClient.ClientCertificate.Password;
 
                return [new X509Certificate2(filePath, password)];
            }
 
            X509CertificateCollection GetKeyStoreCertificate()
            {
                Debug.Assert(
                    _dashboardOptions.ResourceServiceClient.ClientCertificate.Subject != null,
                    "Subject is validated as not null when configuration is loaded.");
 
                var subject = _dashboardOptions.ResourceServiceClient.ClientCertificate.Subject;
                var storeName = _dashboardOptions.ResourceServiceClient.ClientCertificate.Store ?? "My";
                var location = _dashboardOptions.ResourceServiceClient.ClientCertificate.Location ?? StoreLocation.CurrentUser;
 
                using var store = new X509Store(storeName: storeName, storeLocation: location);
 
                store.Open(OpenFlags.ReadOnly);
 
                var certificates = store.Certificates.Find(X509FindType.FindBySubjectName, findValue: subject, validOnly: true);
 
                if (certificates is [])
                {
                    throw new InvalidOperationException($"Unable to load client certificate with subject \"{subject}\" from key store.");
                }
 
                return certificates;
            }
        }
    }
 
    internal sealed class KeyStoreProperties
    {
        public required string Name { get; set; }
        public required StoreLocation Location { get; set; }
    }
 
    // For testing purposes
    internal int OutgoingResourceSubscriberCount => _outgoingChannels.Count;
 
    public bool IsEnabled => _state is not StateDisabled;
 
    private void EnsureInitialized()
    {
        var priorState = Interlocked.CompareExchange(ref _state, value: StateInitialized, comparand: StateNone);
 
        if (priorState is StateDisabled)
        {
            throw new InvalidOperationException($"{nameof(DashboardClient)} is disabled. Check the {nameof(IsEnabled)} property before calling this.");
        }
 
        if (priorState is not StateNone)
        {
            ObjectDisposedException.ThrowIf(priorState is StateDisposed, this);
            return;
        }
 
        _connection = Task.Run(() => ConnectAndWatchResourcesAsync(_clientCancellationToken), _clientCancellationToken);
 
        return;
 
        async Task ConnectAndWatchResourcesAsync(CancellationToken cancellationToken)
        {
            try
            {
                await ConnectAsync().ConfigureAwait(false);
 
                await WatchResourcesWithRecoveryAsync().ConfigureAwait(false);
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                // Ignore. This is likely caused by the dashboard client being disposed. We don't want to log.
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error loading data from the resource service.");
                throw;
            }
 
            async Task ConnectAsync()
            {
                try
                {
                    var response = await _client!.GetApplicationInformationAsync(new(), headers: _headers, cancellationToken: cancellationToken);
 
                    _applicationName = response.ApplicationName;
 
                    _whenConnectedTcs.TrySetResult();
                }
                catch (Exception ex)
                {
                    _whenConnectedTcs.TrySetException(ex);
                }
            }
 
            async Task WatchResourcesWithRecoveryAsync()
            {
                // Track the number of errors we've seen since the last successfully received message.
                // As this number climbs, we extend the amount of time between reconnection attempts, in
                // order to avoid flooding the server with requests. This value is reset to zero whenever
                // a message is successfully received.
                var errorCount = 0;
 
                while (true)
                {
                    cancellationToken.ThrowIfCancellationRequested();
 
                    if (errorCount > 0)
                    {
                        // The most recent attempt failed. There may be more than one failure.
                        // We wait for a period of time determined by the number of errors,
                        // where the time grows exponentially, until a threshold.
                        var delay = ExponentialBackOff(errorCount, maxSeconds: 15);
 
                        await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
                    }
 
                    try
                    {
                        await WatchResourcesAsync().ConfigureAwait(false);
                    }
                    catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested)
                    {
                        // There's a race condition between reconnect attempts and client disposal.
                        // This has been observed in unit tests where the client is created and disposed
                        // very quickly. This check should probably be in the gRPC library instead.
                    }
                    catch (RpcException ex)
                    {
                        errorCount++;
 
                        _logger.LogError(ex, "Error #{ErrorCount} watching resources.", errorCount);
                    }
                }
 
                static TimeSpan ExponentialBackOff(int errorCount, double maxSeconds)
                {
                    return TimeSpan.FromSeconds(Math.Min(Math.Pow(2, errorCount - 1), maxSeconds));
                }
 
                async Task WatchResourcesAsync()
                {
                    var call = _client!.WatchResources(new WatchResourcesRequest { IsReconnect = errorCount != 0 }, headers: _headers, cancellationToken: cancellationToken);
 
                    await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
                    {
                        List<ResourceViewModelChange>? changes = null;
 
                        lock (_lock)
                        {
                            // We received a message, which means we are connected. Clear the error count.
                            errorCount = 0;
 
                            if (response.KindCase == WatchResourcesUpdate.KindOneofCase.InitialData)
                            {
                                // Populate our map using the initial data.
                                _resourceByName.Clear();
 
                                // TODO send a "clear" event via outgoing channels, in case consumers have extra items to be removed
 
                                foreach (var resource in response.InitialData.Resources)
                                {
                                    // Add to map.
                                    var viewModel = resource.ToViewModel(_timeProvider, _knownPropertyLookup);
                                    _resourceByName[resource.Name] = viewModel;
 
                                    // Send this update to any subscribers too.
                                    changes ??= [];
                                    changes.Add(new(ResourceViewModelChangeType.Upsert, viewModel));
                                }
 
                                _initialDataReceivedTcs.TrySetResult();
                            }
                            else if (response.KindCase == WatchResourcesUpdate.KindOneofCase.Changes)
                            {
                                // Apply changes to the model.
                                foreach (var change in response.Changes.Value)
                                {
                                    changes ??= [];
 
                                    if (change.KindCase == WatchResourcesChange.KindOneofCase.Upsert)
                                    {
                                        // Upsert (i.e. add or replace)
                                        var viewModel = change.Upsert.ToViewModel(_timeProvider, _knownPropertyLookup);
                                        _resourceByName[change.Upsert.Name] = viewModel;
                                        changes.Add(new(ResourceViewModelChangeType.Upsert, viewModel));
                                    }
                                    else if (change.KindCase == WatchResourcesChange.KindOneofCase.Delete)
                                    {
                                        // Remove
                                        if (_resourceByName.Remove(change.Delete.ResourceName, out var removed))
                                        {
                                            changes.Add(new(ResourceViewModelChangeType.Delete, removed));
                                        }
                                        else
                                        {
                                            Debug.Fail("Attempt to remove an unknown resource view model.");
                                        }
                                    }
                                    else
                                    {
                                        throw new FormatException($"Unexpected {nameof(WatchResourcesChange)} kind: {change.KindCase}");
                                    }
                                }
                            }
                            else
                            {
                                throw new FormatException($"Unexpected {nameof(WatchResourcesUpdate)} kind: {response.KindCase}");
                            }
                        }
 
                        if (changes is not null)
                        {
                            foreach (var channel in _outgoingChannels)
                            {
                                // Channel is unbound so TryWrite always succeeds.
                                channel.Writer.TryWrite(changes);
                            }
                        }
                    }
                }
            }
        }
    }
 
    Task IDashboardClient.WhenConnected
    {
        get
        {
            // All pages wait for this task (it is used to display the title) but some don't subscribe to resources.
            // If someone is waiting for the connection, we need to ensure connection is starting.
            EnsureInitialized();
 
            return _whenConnectedTcs.Task;
        }
    }
 
    string IDashboardClient.ApplicationName
    {
        get => _applicationName
            ?? _dashboardOptions.ApplicationName
            ?? "Aspire";
    }
 
    async Task<ResourceViewModelSubscription> IDashboardClient.SubscribeResourcesAsync(CancellationToken cancellationToken)
    {
        EnsureInitialized();
 
        var cts = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
 
        // Wait for initial data to be received from the server. This allows initial data to be returned with subscription when client is starting.
        await _initialDataReceivedTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 
        // There are two types of channel in this class. This is not a gRPC channel.
        // It's a producer-consumer queue channel, used to push updates to subscribers
        // without blocking the producer here.
        var channel = Channel.CreateUnbounded<IReadOnlyList<ResourceViewModelChange>>(
            new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });
 
        lock (_lock)
        {
            ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Add(channel), channel);
 
            return new ResourceViewModelSubscription(
                InitialState: _resourceByName.Values.ToImmutableArray(),
                Subscription: StreamUpdatesAsync(cts.Token));
        }
 
        async IAsyncEnumerable<IReadOnlyList<ResourceViewModelChange>> StreamUpdatesAsync([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
        {
            try
            {
                await foreach (var batch in channel.GetBatchesAsync(minReadInterval: TimeSpan.FromMilliseconds(100), cancellationToken: enumeratorCancellationToken).ConfigureAwait(false))
                {
                    if (batch.Count == 1)
                    {
                        yield return batch[0];
                    }
                    else
                    {
                        yield return batch.SelectMany(batch => batch).ToList();
                    }
                }
            }
            finally
            {
                cts.Dispose();
                ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
            }
        }
    }
 
    async IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> IDashboardClient.SubscribeConsoleLogs(string resourceName, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        EnsureInitialized();
 
        using var combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
 
        var call = _client!.WatchResourceConsoleLogs(
            new WatchResourceConsoleLogsRequest() { ResourceName = resourceName },
            headers: _headers,
            cancellationToken: combinedTokens.Token);
 
        // Write incoming logs to a channel, and then read from that channel to yield the logs.
        // We do this to batch logs together and enforce a minimum read interval.
        var channel = Channel.CreateUnbounded<IReadOnlyList<ResourceLogLine>>(
            new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });
 
        var readTask = Task.Run(async () =>
        {
            try
            {
                await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: combinedTokens.Token).ConfigureAwait(false))
                {
                    var logLines = new ResourceLogLine[response.LogLines.Count];
 
                    for (var i = 0; i < logLines.Length; i++)
                    {
                        logLines[i] = new ResourceLogLine(response.LogLines[i].LineNumber, response.LogLines[i].Text, response.LogLines[i].IsStdErr);
                    }
 
                    // Channel is unbound so TryWrite always succeeds.
                    channel.Writer.TryWrite(logLines);
                }
            }
            finally
            {
                channel.Writer.TryComplete();
            }
        }, combinedTokens.Token);
 
        await foreach (var batch in channel.GetBatchesAsync(TimeSpan.FromMilliseconds(100), combinedTokens.Token).ConfigureAwait(false))
        {
            if (batch.Count == 1)
            {
                yield return batch[0];
            }
            else
            {
                yield return batch.SelectMany(batch => batch).ToList();
            }
        }
 
        await readTask.ConfigureAwait(false);
    }
 
    public async Task<ResourceCommandResponseViewModel> ExecuteResourceCommandAsync(string resourceName, string resourceType, CommandViewModel command, CancellationToken cancellationToken)
    {
        EnsureInitialized();
 
        var request = new ResourceCommandRequest()
        {
            CommandName = command.Name,
            Parameter = command.Parameter,
            ResourceName = resourceName,
            ResourceType = resourceType
        };
 
        try
        {
            using var combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
 
            var response = await _client!.ExecuteResourceCommandAsync(request, headers: _headers, cancellationToken: combinedTokens.Token);
 
            return response.ToViewModel();
        }
        catch (RpcException ex)
        {
            _logger.LogError(ex, "Error executing command \"{CommandName}\" on resource \"{ResourceName}\": {StatusCode}", command.Name, resourceName, ex.StatusCode);
 
            var errorMessage = ex.StatusCode == StatusCode.Unimplemented ? "Command not implemented" : "Unknown error. See logs for details";
 
            return new ResourceCommandResponseViewModel()
            {
                Kind = ResourceCommandResponseKind.Failed,
                ErrorMessage = errorMessage
            };
        }
    }
 
    public async ValueTask DisposeAsync()
    {
        if (Interlocked.Exchange(ref _state, StateDisposed) is not StateDisposed)
        {
            _outgoingChannels = [];
 
            await _cts.CancelAsync().ConfigureAwait(false);
 
            _cts.Dispose();
 
            _channel?.Dispose();
 
            await TaskHelpers.WaitIgnoreCancelAsync(_connection, _logger, "Unexpected error from connection task.").ConfigureAwait(false);
        }
    }
 
    // Internal for testing.
    // TODO: Improve this in the future by making the client injected with DI and have it return data.
    internal void SetInitialDataReceived(IList<Resource>? initialData = null)
    {
        if (initialData != null)
        {
            lock (_lock)
            {
                foreach (var data in initialData)
                {
                    _resourceByName[data.Name] = data.ToViewModel(_timeProvider, _knownPropertyLookup);
                }
            }
        }
 
        _initialDataReceivedTcs.TrySetResult();
    }
}