File: Health\ResourceHealthCheckService.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Frozen;
using System.Collections.Immutable;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Eventing;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
 
namespace Aspire.Hosting.Health;
 
internal class ResourceHealthCheckService(ILogger<ResourceHealthCheckService> logger, ResourceNotificationService resourceNotificationService, HealthCheckService healthCheckService, IServiceProvider services, IDistributedApplicationEventing eventing, TimeProvider timeProvider) : BackgroundService
{
    private readonly Dictionary<string, ResourceMonitorState> _resourceMonitoringStates = new();
 
    // Internal for testing.
    internal TimeSpan HealthyHealthCheckInterval { get; set; } = TimeSpan.FromSeconds(30);
    internal TimeSpan NonHealthyHealthCheckStepInterval { get; set; } = TimeSpan.FromSeconds(1);
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            var resourceEvents = resourceNotificationService.WatchAsync(stoppingToken);
 
            // Watch for resource notifications and start and stop health monitoring based on the state of the resource.
            await foreach (var resourceEvent in resourceEvents.ConfigureAwait(false))
            {
                var resourceName = resourceEvent.Resource.Name;
                ResourceMonitorState? state;
 
                lock (_resourceMonitoringStates)
                {
                    if (_resourceMonitoringStates.TryGetValue(resourceName, out state))
                    {
                        state.SetLatestEvent(resourceEvent);
                    }
                }
 
                if (resourceEvent.Snapshot.State?.Text == KnownResourceStates.Running)
                {
                    if (state == null)
                    {
                        // The resource has entered a running state so it's time to start monitoring it's health.
                        state = new ResourceMonitorState(logger, resourceEvent, stoppingToken);
 
                        lock (_resourceMonitoringStates)
                        {
                            _resourceMonitoringStates[resourceName] = state;
                        }
 
                        _ = Task.Run(() => MonitorResourceHealthAsync(state), state.CancellationToken);
                    }
                }
                else if (KnownResourceStates.TerminalStates.Contains(resourceEvent.Snapshot.State?.Text))
                {
                    if (state != null)
                    {
                        // The resource is in a terminal state, so we can stop monitoring it.
                        state.StopResourceMonitor();
                        lock (_resourceMonitoringStates)
                        {
                            _resourceMonitoringStates.Remove(resourceName);
                        }
                    }
                }
            }
        }
        catch (OperationCanceledException)
        {
            // This was expected as the token was canceled
        }
    }
 
    // Internal for testing.
    internal ResourceMonitorState? GetResourceMonitorState(string resourceName)
    {
        lock (_resourceMonitoringStates)
        {
            _resourceMonitoringStates.TryGetValue(resourceName, out var state);
            return state;
        }
    }
 
    private async Task MonitorResourceHealthAsync(ResourceMonitorState state)
    {
        var cancellationToken = state.CancellationToken;
        var resource = state.LatestEvent.Resource;
 
        if (!resource.TryGetAnnotationsIncludingAncestorsOfType<HealthCheckAnnotation>(out var annotations))
        {
            // NOTE: If there are no health check annotations then there
            //       is currently nothing to monitor. At this point in time we don't
            //       dynamically add health checks at runtime. If this changes then we
            //       would need to revisit this and scan for transitive health checks
            //       on a periodic basis (you wouldn't want to do it on every pass.
            logger.LogDebug("Resource '{Resource}' has no health checks to monitor.", resource.Name);
            FireResourceReadyEvent(resource, cancellationToken);
 
            return;
        }
 
        var registrationKeysToCheck = annotations.DistinctBy(a => a.Key).Select(a => a.Key).ToFrozenSet();
        logger.LogDebug("Resource '{Resource}' health checks to monitor: {HeathCheckKeys}", resource.Name, string.Join(", ", registrationKeysToCheck));
 
        var lastHealthCheckTimestamp = 0L;
        var lastDelayInterrupted = false;
        var resourceReadyEventFired = false;
        var nonHealthyReportCount = 0;
        ResourceEvent? currentEvent = null;
 
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                // If the delay was interrupted after less than a second then delay again.
                // This prevents health checks from being called too frequently.
                if (lastDelayInterrupted && TimeSpan.FromSeconds(1) - timeProvider.GetElapsedTime(lastHealthCheckTimestamp) is { Ticks: > 0 } delay)
                {
                    await state.DelayAsync(currentEvent: null, delay, cancellationToken).ConfigureAwait(false);
                    continue;
                }
 
                var report = await healthCheckService.CheckHealthAsync(
                    r => registrationKeysToCheck.Contains(r.Name),
                    cancellationToken
                    ).ConfigureAwait(false);
 
                logger.LogTrace("Health report status for '{Resource}' is {HealthReportStatus}.", resource.Name, report.Status);
 
                if (report.Status == HealthStatus.Healthy)
                {
                    if (!resourceReadyEventFired)
                    {
                        resourceReadyEventFired = true;
                        FireResourceReadyEvent(resource, cancellationToken);
                    }
                    nonHealthyReportCount = 0;
                }
                else
                {
                    nonHealthyReportCount++;
                }
 
                currentEvent = state.LatestEvent;
 
                if (ContainsAnyHealthReportChange(report, currentEvent.Snapshot.HealthReports))
                {
                    logger.LogTrace("Health reports for '{Resource}' have changed. Publishing updated reports.", resource.Name);
 
                    await resourceNotificationService.PublishUpdateAsync(resource, s =>
                    {
                        var healthReports = MergeHealthReports(s.HealthReports, report);
 
                        return s with
                        {
                            // HealthStatus is automatically re-computed after health reports change.
                            HealthReports = healthReports
                        };
                    }).ConfigureAwait(false);
                }
            }
            catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
            {
                // When debugging sometimes we'll get cancelled here but we don't want
                // to tear down the loop. We only want to crash out when the service's
                // cancellation token is signaled.
                logger.LogTrace(
                    ex,
                    "Health check monitoring loop for resource '{Resource}' observed exception but was ignored.",
                    resource.Name
                    );
            }
 
            lastHealthCheckTimestamp = timeProvider.GetTimestamp();
 
            // Long delay if the resource is healthy.
            // Non-heathy delay increases with each consecutive non-healthy report.
            var delayInterval = nonHealthyReportCount == 0
                ? HealthyHealthCheckInterval
                : NonHealthyHealthCheckStepInterval * Math.Min(5, nonHealthyReportCount);
 
            logger.LogTrace("Resource '{Resource}' health check monitoring loop starting delay of {DelayInterval}.", resource.Name, delayInterval);
 
            lastDelayInterrupted = await state.DelayAsync(currentEvent, delayInterval, cancellationToken).ConfigureAwait(false);
        }
    }
 
    private static bool ContainsAnyHealthReportChange(HealthReport report, ImmutableArray<HealthReportSnapshot> latestHealthReportSnapshots)
    {
        var healthCheckNameToStatus = latestHealthReportSnapshots.ToDictionary(p => p.Name);
        foreach (var (key, value) in report.Entries)
        {
            if (!healthCheckNameToStatus.TryGetValue(key, out var checkReportSnapshot))
            {
                return true;
            }
 
            if (checkReportSnapshot.Status != value.Status
                || !StringComparers.HealthReportPropertyValue.Equals(checkReportSnapshot.Description, value.Description)
                || !StringComparers.HealthReportPropertyValue.Equals(checkReportSnapshot.ExceptionText, value.Exception?.ToString()))
            {
                return true;
            }
        }
 
        return false;
    }
 
    private void FireResourceReadyEvent(IResource resource, CancellationToken cancellationToken)
    {
        logger.LogDebug("Resource '{Resource}' is ready.", resource.Name);
 
        // We don't want to block the monitoring loop while we fire the event.
        _ = Task.Run(async () =>
        {
            var resourceReadyEvent = new ResourceReadyEvent(resource, services);
 
            logger.LogDebug("Publishing ResourceReadyEvent for '{Resource}'.", resource.Name);
 
            // Execute the publish and store the task so that waiters can await it and observe the result.
            var task = eventing.PublishAsync(resourceReadyEvent, cancellationToken);
 
            logger.LogDebug("Waiting for ResourceReadyEvent for '{Resource}'.", resource.Name);
 
            // Suppress exceptions, we just want to make sure that the event is completed.
            await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
 
            logger.LogDebug("ResourceReadyEvent for '{Resource}' completed.", resource.Name);
 
            logger.LogDebug("Publishing the result of ResourceReadyEvent for '{Resource}'.", resource.Name);
 
            await resourceNotificationService.PublishUpdateAsync(resource, s => s with
            {
                ResourceReadyEvent = new(task)
            })
            .ConfigureAwait(false);
        },
        cancellationToken);
    }
 
    private static ImmutableArray<HealthReportSnapshot> MergeHealthReports(ImmutableArray<HealthReportSnapshot> healthReports, HealthReport report)
    {
        var builder = healthReports.ToBuilder();
 
        foreach (var (key, entry) in report.Entries)
        {
            var snapshot = new HealthReportSnapshot(key, entry.Status, entry.Description, entry.Exception?.ToString());
 
            var found = false;
            for (var i = 0; i < builder.Count; i++)
            {
                var existing = builder[i];
                if (existing.Name == key)
                {
                    // Replace the existing entry.
                    builder[i] = snapshot;
                    found = true;
                    break;
                }
            }
 
            if (!found)
            {
                // Add a new entry.
                builder.Add(snapshot);
            }
        }
 
        return builder.ToImmutable();
    }
 
    internal class ResourceMonitorState
    {
        private readonly ILogger _logger;
        private readonly CancellationTokenSource _cts;
        private readonly object _lock = new object();
        private readonly string _resourceName;
        private TaskCompletionSource? _delayInterruptTcs;
 
        public ResourceMonitorState(ILogger logger, ResourceEvent initialEvent, CancellationToken serviceStoppingToken)
        {
            _logger = logger;
            _cts = CancellationTokenSource.CreateLinkedTokenSource(serviceStoppingToken);
            _resourceName = initialEvent.Resource.Name;
            LatestEvent = initialEvent;
 
            _logger.LogDebug("Starting health monitoring for resource '{Resource}'.", _resourceName);
        }
 
        // Used to cancel and exit the monitoring loop for a resource.
        public CancellationToken CancellationToken => _cts.Token;
 
        public ResourceEvent LatestEvent { get; private set; }
 
        public void StopResourceMonitor()
        {
            _logger.LogDebug("Stopping health monitoring for resource '{Resource}'.", _resourceName);
            _cts.Cancel();
        }
 
        public void SetLatestEvent(ResourceEvent resourceEvent)
        {
            // Set the latest event to the monitor. The monitor delay may be interrupted if necessary.
            // A lock protects against a race between starting a delay and setting the latest event.
            lock (_lock)
            {
                var shouldInterrupt = ShouldInterrupt(resourceEvent, LatestEvent);
                LatestEvent = resourceEvent;
 
                if (shouldInterrupt)
                {
                    _delayInterruptTcs?.TrySetResult();
                }
            }
        }
 
        internal async Task<bool> DelayAsync(ResourceEvent? currentEvent, TimeSpan delay, CancellationToken cancellationToken)
        {
            lock (_lock)
            {
                // The event might have changed before delay was called. Interrupt immediately if required.
                if (currentEvent != null && ShouldInterrupt(currentEvent, LatestEvent))
                {
                    _logger.LogTrace("Health monitoring delay interrupted for resource '{Resource}'.", _resourceName);
                    return true;
                }
                _delayInterruptTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
            }
 
            try
            {
                await _delayInterruptTcs.Task.WaitAsync(delay, cancellationToken).ConfigureAwait(false);
 
                // Delay was interrupted.
                return true;
            }
            catch (TimeoutException)
            {
                // Delay interval has elapsed.
                return false;
            }
        }
 
        private static bool ShouldInterrupt(ResourceEvent currentEvent, ResourceEvent previousEvent)
        {
            // Interrupt if a newer snapshot is available and the state has changed.
            // This is to ensure that health checks are immediately re-evaluated when the state changes.
            if (currentEvent.Snapshot.Version <= previousEvent.Snapshot.Version)
            {
                return false;
            }
            if (currentEvent.Snapshot.State?.Text == previousEvent.Snapshot.State?.Text)
            {
                return false;
            }
 
            return true;
        }
    }
}