|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Aspire.Hosting.ApplicationModel;
/// <summary>
/// A service that allows publishing and subscribing to changes in the state of a resource.
/// </summary>
public class ResourceNotificationService
{
// Resource state is keyed by the resource and the unique name of the resource. This could be the name of the resource, or a replica ID.
private readonly ConcurrentDictionary<(IResource, string), ResourceNotificationState> _resourceNotificationStates = new();
private readonly ILogger<ResourceNotificationService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly CancellationToken _applicationStopping;
private readonly ResourceLoggerService _resourceLoggerService;
private Action<ResourceEvent>? OnResourceUpdated { get; set; }
/// <summary>
/// Creates a new instance of <see cref="ResourceNotificationService"/>.
/// </summary>
/// <remarks>
/// Obsolete. Use the constructor that accepts an <see cref="ILogger{ResourceNotificationService}"/>, <see cref="IHostApplicationLifetime"/> and <see cref="IServiceProvider"/>.<br/>
/// This constructor will be removed in the next major version of Aspire.
/// </remarks>
/// <param name="logger">The logger.</param>
/// <param name="hostApplicationLifetime">The host application lifetime.</param>
[Obsolete($"""
{nameof(ResourceNotificationService)} now requires an {nameof(IServiceProvider)} and {nameof(ResourceLoggerService)}.
Use the constructor that accepts an {nameof(ILogger)}<{nameof(ResourceNotificationService)}>, {nameof(IHostApplicationLifetime)}, {nameof(IServiceProvider)} and {nameof(ResourceLoggerService)}.
This constructor will be removed in the next major version of Aspire.
""")]
public ResourceNotificationService(ILogger<ResourceNotificationService> logger, IHostApplicationLifetime hostApplicationLifetime)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = new NullServiceProvider();
_applicationStopping = hostApplicationLifetime?.ApplicationStopping ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_resourceLoggerService = new ResourceLoggerService();
}
/// <summary>
/// Creates a new instance of <see cref="ResourceNotificationService"/>.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="hostApplicationLifetime">The host application lifetime.</param>
/// <param name="resourceLoggerService">The resource logger service.</param>
/// <param name="serviceProvider">The service provider.</param>
public ResourceNotificationService(ILogger<ResourceNotificationService> logger, IHostApplicationLifetime hostApplicationLifetime, IServiceProvider serviceProvider, ResourceLoggerService resourceLoggerService)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider;
_applicationStopping = hostApplicationLifetime?.ApplicationStopping ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_resourceLoggerService = resourceLoggerService ?? throw new ArgumentNullException(nameof(resourceLoggerService));
}
private class NullServiceProvider : IServiceProvider
{
public object? GetService(Type serviceType) => null;
}
/// <summary>
/// Waits for a resource to reach the specified state. See <see cref="KnownResourceStates"/> for common states.
/// </summary>
/// <remarks>
/// This method returns a task that will complete when the resource reaches the specified target state. If the resource
/// is already in the target state, the method will return immediately.<br/>
/// If the resource doesn't reach one of the target states before <paramref name="cancellationToken"/> is signaled, this method
/// will throw <see cref="OperationCanceledException"/>.
/// </remarks>
/// <param name="resourceName">The name of the resource.</param>
/// <param name="targetState">The state to wait for the resource to transition to. See <see cref="KnownResourceStates"/> for common states.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/>.</param>
/// <returns>A <see cref="Task"/> representing the wait operation.</returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters",
Justification = "targetState(s) parameters are mutually exclusive.")]
public Task WaitForResourceAsync(string resourceName, string? targetState = null, CancellationToken cancellationToken = default)
{
string[] targetStates = !string.IsNullOrEmpty(targetState) ? [targetState] : [KnownResourceStates.Running];
return WaitForResourceAsync(resourceName, targetStates, cancellationToken);
}
/// <summary>
/// Waits for a resource to reach one of the specified states. See <see cref="KnownResourceStates"/> for common states.
/// </summary>
/// <remarks>
/// This method returns a task that will complete when the resource reaches one of the specified target states. If the resource
/// is already in the target state, the method will return immediately.<br/>
/// If the resource doesn't reach one of the target states before <paramref name="cancellationToken"/> is signaled, this method
/// will throw <see cref="OperationCanceledException"/>.
/// </remarks>
/// <param name="resourceName">The name of the resource.</param>
/// <param name="targetStates">The set of states to wait for the resource to transition to one of. See <see cref="KnownResourceStates"/> for common states.</param>
/// <param name="cancellationToken">A cancellation token that cancels the wait operation when signaled.</param>
/// <returns>A <see cref="Task{String}"/> representing the wait operation and which of the target states the resource reached.</returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters",
Justification = "targetState(s) parameters are mutually exclusive.")]
public async Task<string> WaitForResourceAsync(string resourceName, IEnumerable<string> targetStates, CancellationToken cancellationToken = default)
{
using var watchCts = CancellationTokenSource.CreateLinkedTokenSource(_applicationStopping, cancellationToken);
var watchToken = watchCts.Token;
await foreach (var resourceEvent in WatchAsync(watchToken).ConfigureAwait(false))
{
if (string.Equals(resourceName, resourceEvent.Resource.Name, StringComparisons.ResourceName)
&& resourceEvent.Snapshot.State?.Text is { Length: > 0 } statusText
&& targetStates.Contains(statusText, StringComparers.ResourceState))
{
return statusText;
}
}
throw new OperationCanceledException($"The operation was cancelled before the resource reached one of the target states: [{string.Join(", ", targetStates)}]");
}
private async Task WaitUntilHealthyAsync(IResource resource, IResource dependency, CancellationToken cancellationToken)
{
var resourceLogger = _resourceLoggerService.GetLogger(resource);
resourceLogger.LogInformation("Waiting for resource '{Name}' to enter the '{State}' state.", dependency.Name, KnownResourceStates.Running);
await PublishUpdateAsync(resource, s => s with { State = KnownResourceStates.Waiting }).ConfigureAwait(false);
var resourceEvent = await WaitForResourceAsync(dependency.Name, re => IsContinuableState(re.Snapshot), cancellationToken: cancellationToken).ConfigureAwait(false);
var snapshot = resourceEvent.Snapshot;
if (snapshot.State?.Text == KnownResourceStates.FailedToStart)
{
resourceLogger.LogError(
"Dependency resource '{ResourceName}' failed to start.",
dependency.Name
);
throw new DistributedApplicationException($"Dependency resource '{dependency.Name}' failed to start.");
}
else if (snapshot.State!.Text == KnownResourceStates.Finished || snapshot.State!.Text == KnownResourceStates.Exited)
{
resourceLogger.LogError(
"Resource '{ResourceName}' has entered the '{State}' state prematurely.",
dependency.Name,
snapshot.State.Text
);
throw new DistributedApplicationException(
$"Resource '{dependency.Name}' has entered the '{snapshot.State.Text}' state prematurely."
);
}
// If our dependency resource has health check annotations we want to wait until they turn healthy
// otherwise we don't care about their health status.
if (dependency.TryGetAnnotationsOfType<HealthCheckAnnotation>(out var _))
{
resourceLogger.LogInformation("Waiting for resource '{Name}' to become healthy.", dependency.Name);
await WaitForResourceHealthyAsync(dependency.Name, cancellationToken).ConfigureAwait(false);
}
resourceLogger.LogInformation("Finished waiting for resource '{Name}'.", dependency.Name);
static bool IsContinuableState(CustomResourceSnapshot snapshot) =>
snapshot.State?.Text == KnownResourceStates.Running ||
snapshot.State?.Text == KnownResourceStates.Finished ||
snapshot.State?.Text == KnownResourceStates.Exited ||
snapshot.State?.Text == KnownResourceStates.FailedToStart;
}
/// <summary>
/// Waits for a resource to become healthy.
/// </summary>
/// <param name="resourceName">The name of the resource.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task.</returns>
/// <remarks>
/// This method returns a task that will complete with the resource is healthy. A resource
/// without <see cref="HealthCheckAnnotation"/> annotations will be considered healthy.
/// </remarks>
public Task<ResourceEvent> WaitForResourceHealthyAsync(string resourceName, CancellationToken cancellationToken = default)
{
return WaitForResourceAsync(resourceName, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cancellationToken: cancellationToken);
}
private async Task WaitUntilCompletionAsync(IResource resource, IResource dependency, int exitCode, CancellationToken cancellationToken)
{
if (dependency.TryGetLastAnnotation<ReplicaAnnotation>(out var replicaAnnotation) && replicaAnnotation.Replicas > 1)
{
throw new DistributedApplicationException("WaitForCompletion cannot be used with resources that have replicas.");
}
var resourceLogger = _resourceLoggerService.GetLogger(resource);
resourceLogger.LogInformation("Waiting for resource '{Name}' to complete.", dependency.Name);
await PublishUpdateAsync(resource, s => s with { State = KnownResourceStates.Waiting }).ConfigureAwait(false);
var resourceEvent = await WaitForResourceAsync(dependency.Name, re => IsKnownTerminalState(re.Snapshot), cancellationToken: cancellationToken).ConfigureAwait(false);
var snapshot = resourceEvent.Snapshot;
if (snapshot.State?.Text == KnownResourceStates.FailedToStart)
{
resourceLogger.LogError(
"Dependency resource '{ResourceName}' failed to start.",
dependency.Name
);
throw new DistributedApplicationException($"Dependency resource '{dependency.Name}' failed to start.");
}
else if ((snapshot.State!.Text == KnownResourceStates.Finished || snapshot.State!.Text == KnownResourceStates.Exited) && snapshot.ExitCode is not null && snapshot.ExitCode != exitCode)
{
resourceLogger.LogError(
"Resource '{ResourceName}' has entered the '{State}' state with exit code '{ExitCode}' expected '{ExpectedExitCode}'.",
dependency.Name,
snapshot.State.Text,
snapshot.ExitCode,
exitCode
);
throw new DistributedApplicationException(
$"Resource '{dependency.Name}' has entered the '{snapshot.State.Text}' state with exit code '{snapshot.ExitCode}', expected '{exitCode}'."
);
}
resourceLogger.LogInformation("Finished waiting for resource '{Name}'.", dependency.Name);
static bool IsKnownTerminalState(CustomResourceSnapshot snapshot) =>
KnownResourceStates.TerminalStates.Contains(snapshot.State?.Text) ||
snapshot.ExitCode is not null;
}
/// <summary>
/// Waits for all dependencies of the resource to be ready.
/// </summary>
/// <param name="resource">The resource with dependencies to wait for.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task.</returns>
/// <exception cref="DistributedApplicationException"></exception>
public async Task WaitForDependenciesAsync(IResource resource, CancellationToken cancellationToken)
{
if (!resource.TryGetAnnotationsOfType<WaitAnnotation>(out var waitAnnotations))
{
return;
}
var pendingDependencies = new List<Task>();
foreach (var waitAnnotation in waitAnnotations)
{
var pendingDependency = waitAnnotation.WaitType switch
{
WaitType.WaitUntilHealthy => WaitUntilHealthyAsync(resource, waitAnnotation.Resource, cancellationToken),
WaitType.WaitForCompletion => WaitUntilCompletionAsync(resource, waitAnnotation.Resource, waitAnnotation.ExitCode, cancellationToken),
_ => throw new DistributedApplicationException($"Unexpected wait type: {waitAnnotation.WaitType}")
};
pendingDependencies.Add(pendingDependency);
}
await Task.WhenAll(pendingDependencies).ConfigureAwait(false);
}
/// <summary>
/// Waits until a resource satisfies the specified predicate.
/// </summary>
/// <remarks>
/// This method returns a task that will complete when the specified predicate returns <see langword="true" />.<br/>
/// If the predicate isn't satisfied before <paramref name="cancellationToken"/> is signaled, this method
/// will throw <see cref="OperationCanceledException"/>.
/// </remarks>
/// <param name="resourceName">The name of the resource.</param>
/// <param name="predicate">A predicate which is evaluated for each <see cref="ResourceEvent"/> for the selected resource.</param>
/// <param name="cancellationToken">A cancellation token that cancels the wait operation when signaled.</param>
/// <returns>A <see cref="Task{ResourceEvent}"/> representing the wait operation and which of the target states the resource reached.</returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters",
Justification = "predicate and targetState(s) parameters are mutually exclusive.")]
public async Task<ResourceEvent> WaitForResourceAsync(string resourceName, Func<ResourceEvent, bool> predicate, CancellationToken cancellationToken = default)
{
using var watchCts = CancellationTokenSource.CreateLinkedTokenSource(_applicationStopping, cancellationToken);
var watchToken = watchCts.Token;
await foreach (var resourceEvent in WatchAsync(watchToken).ConfigureAwait(false))
{
if (string.Equals(resourceName, resourceEvent.Resource.Name, StringComparisons.ResourceName)
&& resourceEvent.Snapshot.State?.Text is { Length: > 0 } statusText
&& predicate(resourceEvent))
{
return resourceEvent;
}
}
throw new OperationCanceledException($"The operation was cancelled before the resource met the predicate condition.");
}
private readonly object _onResourceUpdatedLock = new();
/// <summary>
/// Watch for changes to the state for all resources.
/// </summary>
/// <returns></returns>
public async IAsyncEnumerable<ResourceEvent> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Return the last snapshot for each resource.
foreach (var state in _resourceNotificationStates)
{
var (resource, resourceId) = state.Key;
if (state.Value.LastSnapshot is not null)
{
yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot);
}
}
var channel = Channel.CreateUnbounded<ResourceEvent>();
void WriteToChannel(ResourceEvent resourceEvent) =>
channel.Writer.TryWrite(resourceEvent);
lock (_onResourceUpdatedLock)
{
OnResourceUpdated += WriteToChannel;
}
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return item;
}
}
finally
{
lock (_onResourceUpdatedLock)
{
OnResourceUpdated -= WriteToChannel;
}
channel.Writer.TryComplete();
}
}
/// <summary>
/// Updates the snapshot of the <see cref="CustomResourceSnapshot"/> for a resource.
/// </summary>
/// <param name="resource">The resource to update</param>
/// <param name="resourceId"> The id of the resource.</param>
/// <param name="stateFactory">A factory that creates the new state based on the previous state.</param>
public Task PublishUpdateAsync(IResource resource, string resourceId, Func<CustomResourceSnapshot, CustomResourceSnapshot> stateFactory)
{
var notificationState = GetResourceNotificationState(resource, resourceId);
lock (notificationState)
{
var previousState = GetCurrentSnapshot(resource, notificationState);
var newState = stateFactory(previousState);
newState = UpdateCommands(resource, newState);
notificationState.LastSnapshot = newState;
OnResourceUpdated?.Invoke(new ResourceEvent(resource, resourceId, newState));
if (_logger.IsEnabled(LogLevel.Debug) && newState.State?.Text is { Length: > 0 } newStateText && !string.IsNullOrWhiteSpace(newStateText))
{
var previousStateText = previousState?.State?.Text;
if (!string.IsNullOrWhiteSpace(previousStateText) && !string.Equals(previousStateText, newStateText, StringComparison.OrdinalIgnoreCase))
{
// The state text has changed from the previous state
_logger.LogDebug("Resource {Resource}/{ResourceId} changed state: {PreviousState} -> {NewState}", resource.Name, resourceId, previousStateText, newStateText);
}
else if (string.IsNullOrWhiteSpace(previousStateText))
{
// There was no previous state text so just log the new state
_logger.LogDebug("Resource {Resource}/{ResourceId} changed state: {NewState}", resource.Name, resourceId, newStateText);
}
}
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("Resource {Resource}/{ResourceId} update published: " +
"ResourceType = {ResourceType}, CreationTimeStamp = {CreationTimeStamp:s}, State = {{ Text = {StateText}, Style = {StateStyle} }}, " +
"ExitCode = {ExitCode}, EnvironmentVariables = {{ {EnvironmentVariables} }}, Urls = {{ {Urls} }}, " +
"Properties = {{ {Properties} }}",
resource.Name, resourceId,
newState.ResourceType, newState.CreationTimeStamp, newState.State?.Text, newState.State?.Style,
newState.ExitCode, string.Join(", ", newState.EnvironmentVariables.Select(e => $"{e.Name} = {e.Value}")), string.Join(", ", newState.Urls.Select(u => $"{u.Name} = {u.Url}")),
string.Join(", ", newState.Properties.Select(p => $"{p.Name} = {p.Value}")));
}
}
return Task.CompletedTask;
}
/// <summary>
/// Use command annotations to update resource snapshot.
/// </summary>
private CustomResourceSnapshot UpdateCommands(IResource resource, CustomResourceSnapshot previousState)
{
ImmutableArray<ResourceCommandSnapshot>.Builder? builder = null;
foreach (var annotation in resource.Annotations.OfType<ResourceCommandAnnotation>())
{
var existingCommand = FindByName(previousState.Commands, annotation.Name);
if (existingCommand == null)
{
if (builder == null)
{
builder = ImmutableArray.CreateBuilder<ResourceCommandSnapshot>(previousState.Commands.Length);
builder.AddRange(previousState.Commands);
}
// Command doesn't exist in snapshot. Create from annotation.
builder.Add(CreateCommandFromAnnotation(annotation, previousState, _serviceProvider));
}
else
{
// Command already exists in snapshot. Update its state based on annotation callback.
var newState = annotation.UpdateState(new UpdateCommandStateContext { ResourceSnapshot = previousState, ServiceProvider = _serviceProvider });
if (existingCommand.State != newState)
{
if (builder == null)
{
builder = ImmutableArray.CreateBuilder<ResourceCommandSnapshot>(previousState.Commands.Length);
builder.AddRange(previousState.Commands);
}
var newCommand = existingCommand with
{
State = newState
};
builder.Replace(existingCommand, newCommand);
}
}
}
// Commands are unchanged. Return unchanged state.
if (builder == null)
{
return previousState;
}
return previousState with { Commands = builder.ToImmutable() };
static ResourceCommandSnapshot? FindByName(ImmutableArray<ResourceCommandSnapshot> commands, string name)
{
for (var i = 0; i < commands.Length; i++)
{
if (commands[i].Name == name)
{
return commands[i];
}
}
return null;
}
static ResourceCommandSnapshot CreateCommandFromAnnotation(ResourceCommandAnnotation annotation, CustomResourceSnapshot previousState, IServiceProvider serviceProvider)
{
var state = annotation.UpdateState(new UpdateCommandStateContext { ResourceSnapshot = previousState, ServiceProvider = serviceProvider });
return new ResourceCommandSnapshot(annotation.Name, state, annotation.DisplayName, annotation.DisplayDescription, annotation.Parameter, annotation.ConfirmationMessage, annotation.IconName, annotation.IconVariant, annotation.IsHighlighted);
}
}
/// <summary>
/// Updates the snapshot of the <see cref="CustomResourceSnapshot"/> for a resource.
/// </summary>
/// <param name="resource">The resource to update</param>
/// <param name="stateFactory">A factory that creates the new state based on the previous state.</param>
public async Task PublishUpdateAsync(IResource resource, Func<CustomResourceSnapshot, CustomResourceSnapshot> stateFactory)
{
var resourceNames = resource.GetResolvedResourceNames();
foreach (var resourceName in resourceNames)
{
await PublishUpdateAsync(resource, resourceName, stateFactory).ConfigureAwait(false);
}
}
private static CustomResourceSnapshot GetCurrentSnapshot(IResource resource, ResourceNotificationState notificationState)
{
var previousState = notificationState.LastSnapshot;
if (previousState is null)
{
if (resource.Annotations.OfType<ResourceSnapshotAnnotation>().LastOrDefault() is { } annotation)
{
previousState = annotation.InitialSnapshot;
}
// If there is no initial snapshot, create an empty one.
previousState ??= new CustomResourceSnapshot()
{
ResourceType = resource.GetType().Name,
Properties = [],
Relationships = ResourceSnapshotBuilder.BuildRelationships(resource)
};
}
return previousState;
}
private ResourceNotificationState GetResourceNotificationState(IResource resource, string resourceId) =>
_resourceNotificationStates.GetOrAdd((resource, resourceId), _ => new ResourceNotificationState());
/// <summary>
/// The annotation that allows publishing and subscribing to changes in the state of a resource.
/// </summary>
private sealed class ResourceNotificationState
{
public CustomResourceSnapshot? LastSnapshot { get; set; }
}
}
/// <summary>
/// Represents a change in the state of a resource.
/// </summary>
/// <param name="resource">The resource associated with the event.</param>
/// <param name="resourceId">The unique id of the resource.</param>
/// <param name="snapshot">The snapshot of the resource state.</param>
public class ResourceEvent(IResource resource, string resourceId, CustomResourceSnapshot snapshot)
{
/// <summary>
/// The resource associated with the event.
/// </summary>
public IResource Resource { get; } = resource;
/// <summary>
/// The unique id of the resource.
/// </summary>
public string ResourceId { get; } = resourceId;
/// <summary>
/// The snapshot of the resource state.
/// </summary>
public CustomResourceSnapshot Snapshot { get; } = snapshot;
}
|