// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Immutable;
using System.Diagnostics;
using System.Net.Security;
using System.Runtime.CompilerServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Channels;
using Aspire.Dashboard.Configuration;
using Aspire.Dashboard.Utils;
using Aspire.Hosting;
using Aspire.ResourceService.Proto.V1;
using Grpc.Core;
using Grpc.Net.Client;
using Grpc.Net.Client.Configuration;
using Microsoft.Extensions.Options;
namespace Aspire.Dashboard.Model;
/// <summary>
/// Implements gRPC client that communicates with a resource server, populating data for the dashboard.
/// </summary>
/// <remarks>
/// <para>
/// An instance of this type is created per service call, so this class should not hold onto any state
/// expected to live longer than a single RPC request. In the case of streaming requests, the instance
/// lives until the stream is closed.
/// </para>
/// <para>
/// If the <c>DOTNET_RESOURCE_SERVICE_ENDPOINT_URL</c> environment variable is not specified, then there's
/// no known endpoint to connect to, and this dashboard client will be disabled. Calls to
/// <see cref="IDashboardClient.SubscribeResourcesAsync"/> and <see cref="IDashboardClient.SubscribeConsoleLogs"/>
/// will throw if <see cref="IDashboardClientStatus.IsEnabled"/> is <see langword="false"/>. Callers should
/// check this property first, before calling these methods.
/// </para>
/// </remarks>
internal sealed class DashboardClient : IDashboardClient
private const string ApiKeyHeaderName = "x-resource-service-api-key";
private readonly Dictionary<string, ResourceViewModel> _resourceByName = new(StringComparers.ResourceName);
private readonly CancellationTokenSource _cts = new();
private readonly CancellationToken _clientCancellationToken;
private readonly TaskCompletionSource _whenConnectedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _initialDataReceivedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly object _lock = new();
private readonly ILoggerFactory _loggerFactory;
private readonly IDashboardClientStatus _dashboardClientStatus;
private readonly BrowserTimeProvider _timeProvider;
private readonly IKnownPropertyLookup _knownPropertyLookup;
private readonly DashboardOptions _dashboardOptions;
private readonly ILogger<DashboardClient> _logger;
private ImmutableHashSet<Channel<IReadOnlyList<ResourceViewModelChange>>> _outgoingChannels = [];
private string? _applicationName;
private const int StateDisabled = -1;
private const int StateNone = 0;
private const int StateInitialized = 1;
private const int StateDisposed = 2;
private int _state = StateNone;
private readonly GrpcChannel? _channel;
private readonly DashboardService.DashboardServiceClient? _client;
private readonly Metadata _headers = [];
private Task? _connection;
public DashboardClient(
ILoggerFactory loggerFactory,
IConfiguration configuration,
IOptions<DashboardOptions> dashboardOptions,
IDashboardClientStatus dashboardClientStatus,
BrowserTimeProvider timeProvider,
IKnownPropertyLookup knownPropertyLookup,
Action<SocketsHttpHandler>? configureHttpHandler = null)
_loggerFactory = loggerFactory;
_dashboardClientStatus = dashboardClientStatus;
_timeProvider = timeProvider;
_knownPropertyLookup = knownPropertyLookup;
_dashboardOptions = dashboardOptions.Value;
// Take a copy of the token and always use it to avoid race between disposal of CTS and usage of token.
_clientCancellationToken = _cts.Token;
_logger = loggerFactory.CreateLogger<DashboardClient>();
if (!_dashboardClientStatus.IsEnabled)
_state = StateDisabled;
_logger.LogDebug($"{DashboardConfigNames.ResourceServiceUrlName.ConfigKey} is not specified. Dashboard client services are unavailable.");
var address = _dashboardOptions.ResourceServiceClient.GetUri()!;
_logger.LogDebug("Dashboard configured to connect to: {Address}", address);
// Create the gRPC channel. This channel performs automatic reconnects.
// We will dispose it when we are disposed.
_channel = CreateChannel();
if (_dashboardOptions.ResourceServiceClient.AuthMode is ResourceClientAuthMode.ApiKey)
// We're using an API key for auth, so set it in the headers we pass on each call.
_headers.Add(ApiKeyHeaderName, _dashboardOptions.ResourceServiceClient.ApiKey!);
_client = new DashboardService.DashboardServiceClient(_channel);
GrpcChannel CreateChannel()
var httpHandler = new SocketsHttpHandler
EnableMultipleHttp2Connections = true,
KeepAlivePingDelay = TimeSpan.FromSeconds(20),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
var authMode = _dashboardOptions.ResourceServiceClient.AuthMode;
if (authMode == ResourceClientAuthMode.Certificate)
// Auth hasn't been suppressed, so configure it.
var certificates = _dashboardOptions.ResourceServiceClient.ClientCertificate.Source switch
DashboardClientCertificateSource.File => GetFileCertificate(),
DashboardClientCertificateSource.KeyStore => GetKeyStoreCertificate(),
_ => throw new InvalidOperationException("Unable to load ResourceServiceClient client certificate.")
httpHandler.SslOptions = new SslClientAuthenticationOptions
ClientCertificates = certificates
configuration.Bind("Dashboard:ResourceServiceClient:Ssl", httpHandler.SslOptions);
// https://learn.microsoft.com/aspnet/core/grpc/retries
var methodConfig = new MethodConfig
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
// https://learn.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
return GrpcChannel.ForAddress(
channelOptions: new()
HttpHandler = httpHandler,
ServiceConfig = new() { MethodConfigs = { methodConfig } },
LoggerFactory = _loggerFactory,
ThrowOperationCanceledOnCancellation = true
X509CertificateCollection GetFileCertificate()
_dashboardOptions.ResourceServiceClient.ClientCertificate.FilePath != null,
"FilePath is validated as not null when configuration is loaded.");
var filePath = _dashboardOptions.ResourceServiceClient.ClientCertificate.FilePath;
var password = _dashboardOptions.ResourceServiceClient.ClientCertificate.Password;
return [new X509Certificate2(filePath, password)];
X509CertificateCollection GetKeyStoreCertificate()
_dashboardOptions.ResourceServiceClient.ClientCertificate.Subject != null,
"Subject is validated as not null when configuration is loaded.");
var subject = _dashboardOptions.ResourceServiceClient.ClientCertificate.Subject;
var storeName = _dashboardOptions.ResourceServiceClient.ClientCertificate.Store ?? "My";
var location = _dashboardOptions.ResourceServiceClient.ClientCertificate.Location ?? StoreLocation.CurrentUser;
using var store = new X509Store(storeName: storeName, storeLocation: location);
var certificates = store.Certificates.Find(X509FindType.FindBySubjectName, findValue: subject, validOnly: true);
if (certificates is [])
throw new InvalidOperationException($"Unable to load client certificate with subject \"{subject}\" from key store.");
return certificates;
internal sealed class KeyStoreProperties
public required string Name { get; set; }
public required StoreLocation Location { get; set; }
// For testing purposes
internal int OutgoingResourceSubscriberCount => _outgoingChannels.Count;
public bool IsEnabled => _state is not StateDisabled;
private void EnsureInitialized()
var priorState = Interlocked.CompareExchange(ref _state, value: StateInitialized, comparand: StateNone);
if (priorState is StateDisabled)
throw new InvalidOperationException($"{nameof(DashboardClient)} is disabled. Check the {nameof(IsEnabled)} property before calling this.");
if (priorState is not StateNone)
ObjectDisposedException.ThrowIf(priorState is StateDisposed, this);
_connection = Task.Run(() => ConnectAndWatchResourcesAsync(_clientCancellationToken), _clientCancellationToken);
async Task ConnectAndWatchResourcesAsync(CancellationToken cancellationToken)
await ConnectAsync().ConfigureAwait(false);
await WatchResourcesWithRecoveryAsync().ConfigureAwait(false);
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
// Ignore. This is likely caused by the dashboard client being disposed. We don't want to log.
catch (Exception ex)
_logger.LogError(ex, "Error loading data from the resource service.");
async Task ConnectAsync()
var response = await _client!.GetApplicationInformationAsync(new(), headers: _headers, cancellationToken: cancellationToken);
_applicationName = response.ApplicationName;
catch (Exception ex)
async Task WatchResourcesWithRecoveryAsync()
// Track the number of errors we've seen since the last successfully received message.
// As this number climbs, we extend the amount of time between reconnection attempts, in
// order to avoid flooding the server with requests. This value is reset to zero whenever
// a message is successfully received.
var errorCount = 0;
while (true)
if (errorCount > 0)
// The most recent attempt failed. There may be more than one failure.
// We wait for a period of time determined by the number of errors,
// where the time grows exponentially, until a threshold.
var delay = ExponentialBackOff(errorCount, maxSeconds: 15);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
await WatchResourcesAsync().ConfigureAwait(false);
catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested)
// There's a race condition between reconnect attempts and client disposal.
// This has been observed in unit tests where the client is created and disposed
// very quickly. This check should probably be in the gRPC library instead.
catch (RpcException ex)
_logger.LogError(ex, "Error #{ErrorCount} watching resources.", errorCount);
static TimeSpan ExponentialBackOff(int errorCount, double maxSeconds)
return TimeSpan.FromSeconds(Math.Min(Math.Pow(2, errorCount - 1), maxSeconds));
async Task WatchResourcesAsync()
var call = _client!.WatchResources(new WatchResourcesRequest { IsReconnect = errorCount != 0 }, headers: _headers, cancellationToken: cancellationToken);
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
List<ResourceViewModelChange>? changes = null;
lock (_lock)
// We received a message, which means we are connected. Clear the error count.
errorCount = 0;
if (response.KindCase == WatchResourcesUpdate.KindOneofCase.InitialData)
// Populate our map using the initial data.
// TODO send a "clear" event via outgoing channels, in case consumers have extra items to be removed
foreach (var resource in response.InitialData.Resources)
// Add to map.
var viewModel = resource.ToViewModel(_timeProvider, _knownPropertyLookup);
_resourceByName[resource.Name] = viewModel;
// Send this update to any subscribers too.
changes ??= [];
changes.Add(new(ResourceViewModelChangeType.Upsert, viewModel));
else if (response.KindCase == WatchResourcesUpdate.KindOneofCase.Changes)
// Apply changes to the model.
foreach (var change in response.Changes.Value)
changes ??= [];
if (change.KindCase == WatchResourcesChange.KindOneofCase.Upsert)
// Upsert (i.e. add or replace)
var viewModel = change.Upsert.ToViewModel(_timeProvider, _knownPropertyLookup);
_resourceByName[change.Upsert.Name] = viewModel;
changes.Add(new(ResourceViewModelChangeType.Upsert, viewModel));
else if (change.KindCase == WatchResourcesChange.KindOneofCase.Delete)
// Remove
if (_resourceByName.Remove(change.Delete.ResourceName, out var removed))
changes.Add(new(ResourceViewModelChangeType.Delete, removed));
Debug.Fail("Attempt to remove an unknown resource view model.");
throw new FormatException($"Unexpected {nameof(WatchResourcesChange)} kind: {change.KindCase}");
throw new FormatException($"Unexpected {nameof(WatchResourcesUpdate)} kind: {response.KindCase}");
if (changes is not null)
foreach (var channel in _outgoingChannels)
// Channel is unbound so TryWrite always succeeds.
Task IDashboardClient.WhenConnected
// All pages wait for this task (it is used to display the title) but some don't subscribe to resources.
// If someone is waiting for the connection, we need to ensure connection is starting.
return _whenConnectedTcs.Task;
string IDashboardClient.ApplicationName
get => _applicationName
?? _dashboardOptions.ApplicationName
?? "Aspire";
async Task<ResourceViewModelSubscription> IDashboardClient.SubscribeResourcesAsync(CancellationToken cancellationToken)
var cts = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
// Wait for initial data to be received from the server. This allows initial data to be returned with subscription when client is starting.
await _initialDataReceivedTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
// There are two types of channel in this class. This is not a gRPC channel.
// It's a producer-consumer queue channel, used to push updates to subscribers
// without blocking the producer here.
var channel = Channel.CreateUnbounded<IReadOnlyList<ResourceViewModelChange>>(
new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });
lock (_lock)
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Add(channel), channel);
return new ResourceViewModelSubscription(
InitialState: _resourceByName.Values.ToImmutableArray(),
Subscription: StreamUpdatesAsync(cts.Token));
async IAsyncEnumerable<IReadOnlyList<ResourceViewModelChange>> StreamUpdatesAsync([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
await foreach (var batch in channel.GetBatchesAsync(minReadInterval: TimeSpan.FromMilliseconds(100), cancellationToken: enumeratorCancellationToken).ConfigureAwait(false))
if (batch.Count == 1)
yield return batch[0];
yield return batch.SelectMany(batch => batch).ToList();
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
async IAsyncEnumerable<IReadOnlyList<ResourceLogLine>> IDashboardClient.SubscribeConsoleLogs(string resourceName, [EnumeratorCancellation] CancellationToken cancellationToken)
using var combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
var call = _client!.WatchResourceConsoleLogs(
new WatchResourceConsoleLogsRequest() { ResourceName = resourceName },
headers: _headers,
cancellationToken: combinedTokens.Token);
// Write incoming logs to a channel, and then read from that channel to yield the logs.
// We do this to batch logs together and enforce a minimum read interval.
var channel = Channel.CreateUnbounded<IReadOnlyList<ResourceLogLine>>(
new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });
var readTask = Task.Run(async () =>
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken: combinedTokens.Token).ConfigureAwait(false))
var logLines = new ResourceLogLine[response.LogLines.Count];
for (var i = 0; i < logLines.Length; i++)
logLines[i] = new ResourceLogLine(response.LogLines[i].LineNumber, response.LogLines[i].Text, response.LogLines[i].IsStdErr);
// Channel is unbound so TryWrite always succeeds.
}, combinedTokens.Token);
await foreach (var batch in channel.GetBatchesAsync(TimeSpan.FromMilliseconds(100), combinedTokens.Token).ConfigureAwait(false))
if (batch.Count == 1)
yield return batch[0];
yield return batch.SelectMany(batch => batch).ToList();
await readTask.ConfigureAwait(false);
public async Task<ResourceCommandResponseViewModel> ExecuteResourceCommandAsync(string resourceName, string resourceType, CommandViewModel command, CancellationToken cancellationToken)
var request = new ResourceCommandRequest()
CommandName = command.Name,
Parameter = command.Parameter,
ResourceName = resourceName,
ResourceType = resourceType
using var combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(_clientCancellationToken, cancellationToken);
var response = await _client!.ExecuteResourceCommandAsync(request, headers: _headers, cancellationToken: combinedTokens.Token);
return response.ToViewModel();
catch (RpcException ex)
_logger.LogError(ex, "Error executing command \"{CommandName}\" on resource \"{ResourceName}\": {StatusCode}", command.Name, resourceName, ex.StatusCode);
var errorMessage = ex.StatusCode == StatusCode.Unimplemented ? "Command not implemented" : "Unknown error. See logs for details";
return new ResourceCommandResponseViewModel()
Kind = ResourceCommandResponseKind.Failed,
ErrorMessage = errorMessage
public async ValueTask DisposeAsync()
if (Interlocked.Exchange(ref _state, StateDisposed) is not StateDisposed)
_outgoingChannels = [];
await _cts.CancelAsync().ConfigureAwait(false);
await TaskHelpers.WaitIgnoreCancelAsync(_connection, _logger, "Unexpected error from connection task.").ConfigureAwait(false);
// Internal for testing.
// TODO: Improve this in the future by making the client injected with DI and have it return data.
internal void SetInitialDataReceived(IList<Resource>? initialData = null)
if (initialData != null)
lock (_lock)
foreach (var data in initialData)
_resourceByName[data.Name] = data.ToViewModel(_timeProvider, _knownPropertyLookup);