File: Dcp\ApplicationExecutor.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Data;
using System.Diagnostics;
using System.Globalization;
using System.Net.Sockets;
using System.Text.Json;
using System.Threading.Channels;
using Aspire.Dashboard.ConsoleLogs;
using Aspire.Dashboard.Model;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.ConsoleLogs;
using Aspire.Hosting.Dashboard;
using Aspire.Hosting.Dcp.Model;
using Aspire.Hosting.Eventing;
using Aspire.Hosting.Lifecycle;
using Aspire.Hosting.Utils;
using Json.Patch;
using k8s;
using k8s.Autorest;
using k8s.Models;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Retry;
using Polly.Timeout;
 
namespace Aspire.Hosting.Dcp;
 
[DebuggerDisplay("ModelResource = {ModelResource}, DcpResource = {DcpResource}")]
internal class AppResource
{
    public IResource ModelResource { get; }
    public CustomResource DcpResource { get; }
    public virtual List<ServiceAppResource> ServicesProduced { get; } = [];
    public virtual List<ServiceAppResource> ServicesConsumed { get; } = [];
 
    public AppResource(IResource modelResource, CustomResource dcpResource)
    {
        ModelResource = modelResource;
        DcpResource = dcpResource;
    }
}
 
internal sealed class ServiceAppResource : AppResource
{
    public Service Service => (Service)DcpResource;
    public EndpointAnnotation EndpointAnnotation { get; }
 
    public override List<ServiceAppResource> ServicesProduced
    {
        get { throw new InvalidOperationException("Service resources do not produce any services"); }
    }
    public override List<ServiceAppResource> ServicesConsumed
    {
        get { throw new InvalidOperationException("Service resources do not consume any services"); }
    }
 
    public ServiceAppResource(IResource modelResource, Service service, EndpointAnnotation sba) : base(modelResource, service)
    {
        EndpointAnnotation = sba;
    }
}
 
internal sealed class ApplicationExecutor(ILogger<ApplicationExecutor> logger,
                                          ILogger<DistributedApplication> distributedApplicationLogger,
                                          DistributedApplicationModel model,
                                          IKubernetesService kubernetesService,
                                          IEnumerable<IDistributedApplicationLifecycleHook> lifecycleHooks,
                                          IConfiguration configuration,
                                          DistributedApplicationOptions distributedApplicationOptions,
                                          IOptions<DcpOptions> options,
                                          DistributedApplicationExecutionContext executionContext,
                                          ResourceNotificationService notificationService,
                                          ResourceLoggerService loggerService,
                                          IDcpDependencyCheckService dcpDependencyCheckService,
                                          IDistributedApplicationEventing eventing,
                                          IServiceProvider serviceProvider,
                                          DcpNameGenerator nameGenerator
                                          )
{
    private const string DebugSessionPortVar = "DEBUG_SESSION_PORT";
 
    // A random suffix added to every DCP object name ensures that those names (and derived object names, for example container names)
    // are unique machine-wide with a high level of probability.
    // The length of 8 achieves that while keeping the names relatively short and readable.
    // The second purpose of the suffix is to play a role of a unique OpenTelemetry service instance ID.
    private const int RandomNameSuffixLength = 8;
 
    private const string DefaultAspireNetworkName = "default-aspire-network";
 
    private readonly ILogger<ApplicationExecutor> _logger = logger;
    private readonly DistributedApplicationModel _model = model;
    private readonly Dictionary<string, IResource> _applicationModel = model.Resources.ToDictionary(r => r.Name);
    private readonly ILookup<IResource?, IResourceWithParent> _parentChildLookup = GetParentChildLookup(model);
    private readonly IDistributedApplicationLifecycleHook[] _lifecycleHooks = lifecycleHooks.ToArray();
    private readonly DistributedApplicationOptions _distributedApplicationOptions = distributedApplicationOptions;
    private readonly IOptions<DcpOptions> _options = options;
    private readonly DistributedApplicationExecutionContext _executionContext = executionContext;
    private readonly List<AppResource> _appResources = [];
    private readonly CancellationTokenSource _shutdownCancellation = new();
 
    private readonly ConcurrentDictionary<string, Container> _containersMap = [];
    private readonly ConcurrentDictionary<string, Executable> _executablesMap = [];
    private readonly ConcurrentDictionary<string, Service> _servicesMap = [];
    private readonly ConcurrentDictionary<string, Endpoint> _endpointsMap = [];
    private readonly ConcurrentDictionary<(string, string), List<string>> _resourceAssociatedServicesMap = [];
    private readonly ConcurrentDictionary<string, (CancellationTokenSource Cancellation, Task Task)> _logStreams = new();
    private DcpInfo? _dcpInfo;
    private Task? _resourceWatchTask;
 
    private readonly record struct LogInformationEntry(string ResourceName, bool? LogsAvailable, bool? HasSubscribers);
    private readonly Channel<LogInformationEntry> _logInformationChannel = Channel.CreateUnbounded<LogInformationEntry>(
        new UnboundedChannelOptions { SingleReader = true });
 
    private string DefaultContainerHostName => configuration["AppHost:ContainerHostname"] ?? _dcpInfo?.Containers?.ContainerHostName ?? "host.docker.internal";
 
    public async Task RunApplicationAsync(CancellationToken cancellationToken = default)
    {
        AspireEventSource.Instance.DcpModelCreationStart();
 
        _dcpInfo = await dcpDependencyCheckService.GetDcpInfoAsync(cancellationToken).ConfigureAwait(false);
 
        Debug.Assert(_dcpInfo is not null, "DCP info should not be null at this point");
 
        try
        {
            PrepareServices();
            PrepareContainers();
            PrepareExecutables();
 
            await PublishResourcesWithInitialStateAsync().ConfigureAwait(false);
 
            // Watch for changes to the resource state.
            WatchResourceChanges();
 
            await CreateServicesAsync(cancellationToken).ConfigureAwait(false);
 
            await CreateContainerNetworksAsync(cancellationToken).ConfigureAwait(false);
 
            await CreateContainersAndExecutablesAsync(cancellationToken).ConfigureAwait(false);
 
            var afterResourcesCreatedEvent = new AfterResourcesCreatedEvent(serviceProvider, _model);
            await eventing.PublishAsync(afterResourcesCreatedEvent, cancellationToken).ConfigureAwait(false);
 
            foreach (var lifecycleHook in _lifecycleHooks)
            {
                await lifecycleHook.AfterResourcesCreatedAsync(_model, cancellationToken).ConfigureAwait(false);
            }
        }
        catch
        {
            _shutdownCancellation.Cancel();
            throw;
        }
        finally
        {
            AspireEventSource.Instance.DcpModelCreationStop();
        }
    }
 
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        _shutdownCancellation.Cancel();
        var tasks = new List<Task>();
        if (_resourceWatchTask is { } resourceTask)
        {
            tasks.Add(resourceTask);
        }
 
        foreach (var (_, (cancellation, logTask)) in _logStreams)
        {
            cancellation.Cancel();
            tasks.Add(logTask);
        }
 
        try
        {
            await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // Ignore.
        }
        catch (Exception ex)
        {
            _logger.LogDebug(ex, "One or more monitoring tasks terminated with an error.");
        }
    }
 
    private static ILookup<IResource?, IResourceWithParent> GetParentChildLookup(DistributedApplicationModel model)
    {
        static IResource? SelectParentContainerResource(IResource resource) => resource switch
        {
            IResourceWithParent rp => SelectParentContainerResource(rp.Parent),
            IResource r when r.IsContainer() => r,
            _ => null
        };
 
        // parent -> children lookup
        return model.Resources.OfType<IResourceWithParent>()
                              .Select(x => (Child: x, Root: SelectParentContainerResource(x.Parent)))
                              .Where(x => x.Root is not null)
                              .ToLookup(x => x.Root, x => x.Child);
    }
 
    // Sets the state of the resource's children
    async Task SetChildResourceAsync(IResource resource, string parentName, string? state, DateTime? startTimeStamp, DateTime? stopTimeStamp)
    {
        foreach (var child in _parentChildLookup[resource])
        {
            await notificationService.PublishUpdateAsync(child, s => s with
            {
                State = state,
                StartTimeStamp = startTimeStamp,
                StopTimeStamp = stopTimeStamp,
                Properties = s.Properties.SetResourceProperty(KnownProperties.Resource.ParentName, parentName)
            }).ConfigureAwait(false);
        }
    }
 
    private async Task PublishResourcesWithInitialStateAsync()
    {
        // Publish the initial state of the resources that have a snapshot annotation.
        foreach (var resource in _model.Resources)
        {
            await notificationService.PublishUpdateAsync(resource, s =>
            {
                return s with
                {
                    HealthReports = GetInitialHealthReports(resource)
                };
            }).ConfigureAwait(false);
        }
 
        static ImmutableArray<HealthReportSnapshot> GetInitialHealthReports(IResource resource)
        {
            if (!resource.TryGetAnnotationsIncludingAncestorsOfType<HealthCheckAnnotation>(out var annotations))
            {
                return [];
            }
 
            var reports = annotations.Select(annotation => new HealthReportSnapshot(annotation.Key, null, null, null));
            return [.. reports];
        }
    }
 
    private void WatchResourceChanges()
    {
        var outputSemaphore = new SemaphoreSlim(1);
 
        var cancellationToken = _shutdownCancellation.Token;
        var watchResourcesTask = Task.Run(async () =>
        {
            using (outputSemaphore)
            {
                await Task.WhenAll(
                    Task.Run(() => WatchKubernetesResourceAsync<Executable>((t, r) => ProcessResourceChange(t, r, _executablesMap, "Executable", ToSnapshot))),
                    Task.Run(() => WatchKubernetesResourceAsync<Container>((t, r) => ProcessResourceChange(t, r, _containersMap, "Container", ToSnapshot))),
                    Task.Run(() => WatchKubernetesResourceAsync<Service>(ProcessServiceChange)),
                    Task.Run(() => WatchKubernetesResourceAsync<Endpoint>(ProcessEndpointChange))).ConfigureAwait(false);
            }
        });
 
        var watchSubscribersTask = Task.Run(async () =>
        {
            await foreach (var subscribers in loggerService.WatchAnySubscribersAsync(cancellationToken).ConfigureAwait(false))
            {
                _logInformationChannel.Writer.TryWrite(new(subscribers.Name, LogsAvailable: null, subscribers.AnySubscribers));
            }
        });
 
        // Listen to the "log information channel" - which contains updates when resources have logs available and when they have subscribers.
        // A resource needs both logs available and subscribers before it starts streaming its logs.
        // We only want to start the log stream for resources when they have subscribers.
        // And when there are no more subscribers, we want to stop the stream.
        var watchInformationChannelTask = Task.Run(async () =>
        {
            var resourceLogState = new Dictionary<string, (bool logsAvailable, bool hasSubscribers)>();
 
            await foreach (var entry in _logInformationChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
            {
                var logsAvailable = false;
                var hasSubscribers = false;
                if (resourceLogState.TryGetValue(entry.ResourceName, out (bool, bool) stateEntry))
                {
                    (logsAvailable, hasSubscribers) = stateEntry;
                }
 
                // LogsAvailable can only go from false => true. Once it is true, it can never go back to false.
                Debug.Assert(!entry.LogsAvailable.HasValue || entry.LogsAvailable.Value, "entry.LogsAvailable should never be 'false'");
 
                logsAvailable = entry.LogsAvailable ?? logsAvailable;
                hasSubscribers = entry.HasSubscribers ?? hasSubscribers;
 
                if (logsAvailable)
                {
                    if (hasSubscribers)
                    {
                        if (_containersMap.TryGetValue(entry.ResourceName, out var container))
                        {
                            StartLogStream(container);
                        }
                        else if (_executablesMap.TryGetValue(entry.ResourceName, out var executable))
                        {
                            StartLogStream(executable);
                        }
                    }
                    else
                    {
                        if (_logStreams.TryRemove(entry.ResourceName, out var logStream))
                        {
                            logStream.Cancellation.Cancel();
                        }
                    }
                }
 
                resourceLogState[entry.ResourceName] = (logsAvailable, hasSubscribers);
            }
        });
 
        _resourceWatchTask = Task.WhenAll(watchResourcesTask, watchSubscribersTask, watchInformationChannelTask);
 
        async Task WatchKubernetesResourceAsync<T>(Func<WatchEventType, T, Task> handler) where T : CustomResource
        {
            var retryUntilCancelled = new RetryStrategyOptions()
            {
                ShouldHandle = new PredicateBuilder().HandleInner<EndOfStreamException>(),
                BackoffType = DelayBackoffType.Exponential,
                MaxRetryAttempts = int.MaxValue,
                UseJitter = true,
                MaxDelay = TimeSpan.FromSeconds(30),
                OnRetry = (retry) =>
                {
                    _logger.LogDebug(
                        retry.Outcome.Exception,
                        "Long poll watch operation was ended by server after {LongPollDurationInMs} milliseconds (iteration {Iteration}).",
                        retry.Duration.TotalMilliseconds,
                        retry.AttemptNumber
                        );
                    return ValueTask.CompletedTask;
                }
            };
 
            var pipeline = new ResiliencePipelineBuilder().AddRetry(retryUntilCancelled).Build();
 
            try
            {
                _logger.LogDebug("Watching over DCP {ResourceType} resources.", typeof(T).Name);
                await pipeline.ExecuteAsync(async (pipelineCancellationToken) =>
                {
                    await foreach (var (eventType, resource) in kubernetesService.WatchAsync<T>(cancellationToken: pipelineCancellationToken).ConfigureAwait<(global::k8s.WatchEventType, T)>(false))
                    {
                        await outputSemaphore.WaitAsync(pipelineCancellationToken).ConfigureAwait(false);
 
                        try
                        {
                            await handler(eventType, resource).ConfigureAwait(false);
                        }
                        finally
                        {
                            outputSemaphore.Release();
                        }
                    }
                }, cancellationToken).ConfigureAwait(false);
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                // Shutdown requested.
                _logger.LogDebug("Cancellation received while watching {ResourceType} resources.", typeof(T).Name);
            }
            catch (Exception ex)
            {
                _logger.LogCritical(ex, "Watch task over Kubernetes {ResourceType} resources terminated unexpectedly.", typeof(T).Name);
            }
            finally
            {
                _logger.LogDebug("Stopped watching {ResourceType} resources.", typeof(T).Name);
            }
        }
    }
 
    private async Task ProcessResourceChange<T>(WatchEventType watchEventType, T resource, ConcurrentDictionary<string, T> resourceByName, string resourceKind, Func<T, CustomResourceSnapshot, CustomResourceSnapshot> snapshotFactory) where T : CustomResource
    {
        if (ProcessResourceChange(resourceByName, watchEventType, resource))
        {
            UpdateAssociatedServicesMap();
 
            var changeType = watchEventType switch
            {
                WatchEventType.Added or WatchEventType.Modified => ResourceSnapshotChangeType.Upsert,
                WatchEventType.Deleted => ResourceSnapshotChangeType.Delete,
                _ => throw new System.ComponentModel.InvalidEnumArgumentException($"Cannot convert {nameof(WatchEventType)} with value {watchEventType} into enum of type {nameof(ResourceSnapshotChangeType)}.")
            };
 
            // Find the associated application model resource and update it.
            var resourceName = resource.AppModelResourceName;
 
            if (resourceName is not null &&
                _applicationModel.TryGetValue(resourceName, out var appModelResource))
            {
                if (changeType == ResourceSnapshotChangeType.Delete)
                {
                    // Stop the log stream for the resource
                    if (_logStreams.TryRemove(resource.Metadata.Name, out var logStream))
                    {
                        logStream.Cancellation.Cancel();
                    }
 
                    // TODO: Handle resource deletion
                    if (_logger.IsEnabled(LogLevel.Trace))
                    {
                        _logger.LogTrace("Deleting application model resource {ResourceName} with {ResourceKind} resource {ResourceName}", appModelResource.Name, resourceKind, resource.Metadata.Name);
                    }
                }
                else
                {
                    if (_logger.IsEnabled(LogLevel.Trace))
                    {
                        _logger.LogTrace("Updating application model resource {ResourceName} with {ResourceKind} resource {ResourceName}", appModelResource.Name, resourceKind, resource.Metadata.Name);
                    }
 
                    // Notifications are associated with the application model resource, so we need to update with that context
                    await notificationService.PublishUpdateAsync(appModelResource, resource.Metadata.Name, s => snapshotFactory(resource, s)).ConfigureAwait(false);
 
                    if (resource is Container { LogsAvailable: true } ||
                        resource is Executable { LogsAvailable: true })
                    {
                        _logInformationChannel.Writer.TryWrite(new(resource.Metadata.Name, LogsAvailable: true, HasSubscribers: null));
                    }
                }
 
                // Update all child resources of containers
                if (resource is Container c && c.Status is { } status)
                {
                    await SetChildResourceAsync(
                        appModelResource,
                        resource.Metadata.Name,
                        status.State,
                        status.StartupTimestamp?.ToUniversalTime(),
                        status.FinishTimestamp?.ToUniversalTime()).ConfigureAwait(false);
                }
            }
            else
            {
                // No application model resource found for the DCP resource.
                if (_logger.IsEnabled(LogLevel.Trace))
                {
                    _logger.LogTrace("No application model resource found for {ResourceKind} resource {ResourceName}", resourceKind, resource.Metadata.Name);
                }
            }
        }
 
        void UpdateAssociatedServicesMap()
        {
            // We keep track of associated services for the resource
            // So whenever we get the service we can figure out if the service can generate endpoint for the resource
            if (watchEventType == WatchEventType.Deleted)
            {
                _resourceAssociatedServicesMap.Remove((resourceKind, resource.Metadata.Name), out _);
            }
            else if (resource.Metadata.Annotations?.TryGetValue(CustomResource.ServiceProducerAnnotation, out var servicesProducedAnnotationJson) == true)
            {
                var serviceProducerAnnotations = JsonSerializer.Deserialize<ServiceProducerAnnotation[]>(servicesProducedAnnotationJson);
                if (serviceProducerAnnotations is not null)
                {
                    _resourceAssociatedServicesMap[(resourceKind, resource.Metadata.Name)]
                        = serviceProducerAnnotations.Select(e => e.ServiceName).ToList();
                }
            }
        }
    }
 
    private void StartLogStream<T>(T resource) where T : CustomResource
    {
        IAsyncEnumerable<IReadOnlyList<(string, bool)>>? enumerable = resource switch
        {
            Container c when c.LogsAvailable => new ResourceLogSource<T>(_logger, kubernetesService, _dcpInfo?.Version, resource),
            Executable e when e.LogsAvailable => new ResourceLogSource<T>(_logger, kubernetesService, _dcpInfo?.Version, resource),
            _ => null
        };
 
        // No way to get logs for this resource as yet
        if (enumerable is null)
        {
            return;
        }
 
        // This does not run concurrently for the same resource so we can safely use GetOrAdd without
        // creating multiple log streams.
        _logStreams.GetOrAdd(resource.Metadata.Name, (_) =>
        {
            var cancellation = new CancellationTokenSource();
 
            var task = Task.Run(async () =>
            {
                try
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                    {
                        _logger.LogDebug("Starting log streaming for {ResourceName}.", resource.Metadata.Name);
                    }
 
                    // Pump the logs from the enumerable into the logger
                    var logger = loggerService.GetInternalLogger(resource.Metadata.Name);
 
                    await foreach (var batch in enumerable.WithCancellation(cancellation.Token).ConfigureAwait(false))
                    {
                        foreach (var (content, isError) in batch)
                        {
                            DateTime? timestamp = null;
                            var resolvedContent = content;
 
                            if (TimestampParser.TryParseConsoleTimestamp(resolvedContent, out var result))
                            {
                                resolvedContent = result.Value.ModifiedText;
                                timestamp = result.Value.Timestamp.UtcDateTime;
                            }
 
                            logger(LogEntry.Create(timestamp, resolvedContent, content, isError));
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ignore
                    _logger.LogDebug("Log streaming for {ResourceName} was cancelled.", resource.Metadata.Name);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error streaming logs for {ResourceName}.", resource.Metadata.Name);
                }
            },
            cancellation.Token);
 
            return (cancellation, task);
        });
    }
 
    private async Task ProcessEndpointChange(WatchEventType watchEventType, Endpoint endpoint)
    {
        if (!ProcessResourceChange(_endpointsMap, watchEventType, endpoint))
        {
            return;
        }
 
        if (endpoint.Metadata.OwnerReferences is null)
        {
            return;
        }
 
        foreach (var ownerReference in endpoint.Metadata.OwnerReferences)
        {
            await TryRefreshResource(ownerReference.Kind, ownerReference.Name).ConfigureAwait(false);
        }
    }
 
    private async Task ProcessServiceChange(WatchEventType watchEventType, Service service)
    {
        if (!ProcessResourceChange(_servicesMap, watchEventType, service))
        {
            return;
        }
 
        foreach (var ((resourceKind, resourceName), _) in _resourceAssociatedServicesMap.Where(e => e.Value.Contains(service.Metadata.Name)))
        {
            await TryRefreshResource(resourceKind, resourceName).ConfigureAwait(false);
        }
    }
 
    private async ValueTask TryRefreshResource(string resourceKind, string resourceName)
    {
        CustomResource? cr = resourceKind switch
        {
            "Container" => _containersMap.TryGetValue(resourceName, out var container) ? container : null,
            "Executable" => _executablesMap.TryGetValue(resourceName, out var executable) ? executable : null,
            _ => null
        };
 
        if (cr is not null)
        {
            var appModelResourceName = cr.AppModelResourceName;
 
            if (appModelResourceName is not null &&
                _applicationModel.TryGetValue(appModelResourceName, out var appModelResource))
            {
                await notificationService.PublishUpdateAsync(appModelResource, cr.Metadata.Name, s =>
                {
                    if (cr is Container container)
                    {
                        return ToSnapshot(container, s);
                    }
                    else if (cr is Executable exe)
                    {
                        return ToSnapshot(exe, s);
                    }
                    return s;
                })
                .ConfigureAwait(false);
            }
        }
    }
 
    private CustomResourceSnapshot ToSnapshot(Container container, CustomResourceSnapshot previous)
    {
        var containerId = container.Status?.ContainerId;
        var urls = GetUrls(container);
        var volumes = GetVolumes(container);
 
        var environment = GetEnvironmentVariables(container.Status?.EffectiveEnv ?? container.Spec.Env, container.Spec.Env);
        var state = container.AppModelInitialState == KnownResourceStates.Hidden ? KnownResourceStates.Hidden : container.Status?.State;
 
        var relationships = ImmutableArray<RelationshipSnapshot>.Empty;
        if (container.AppModelResourceName is not null &&
            _applicationModel.TryGetValue(container.AppModelResourceName, out var appModelResource))
        {
            relationships = ResourceSnapshotBuilder.BuildRelationships(appModelResource);
        }
 
        return previous with
        {
            ResourceType = KnownResourceTypes.Container,
            State = state,
            // Map a container exit code of -1 (unknown) to null
            ExitCode = container.Status?.ExitCode is null or Conventions.UnknownExitCode ? null : container.Status.ExitCode,
            Properties = [
                new(KnownProperties.Container.Image, container.Spec.Image),
                new(KnownProperties.Container.Id, containerId),
                new(KnownProperties.Container.Command, container.Spec.Command),
                new(KnownProperties.Container.Args, container.Status?.EffectiveArgs ?? []) { IsSensitive = true },
                new(KnownProperties.Container.Ports, GetPorts()),
                new(KnownProperties.Container.Lifetime, GetContainerLifetime()),
            ],
            EnvironmentVariables = environment,
            CreationTimeStamp = container.Metadata.CreationTimestamp?.ToUniversalTime(),
            StartTimeStamp = container.Status?.StartupTimestamp?.ToUniversalTime(),
            StopTimeStamp = container.Status?.FinishTimestamp?.ToUniversalTime(),
            Urls = urls,
            Volumes = volumes,
            Relationships = relationships
        };
 
        ImmutableArray<int> GetPorts()
        {
            if (container.Spec.Ports is null)
            {
                return [];
            }
 
            var ports = ImmutableArray.CreateBuilder<int>();
            foreach (var port in container.Spec.Ports)
            {
                if (port.ContainerPort != null)
                {
                    ports.Add(port.ContainerPort.Value);
                }
            }
            return ports.ToImmutable();
        }
 
        ContainerLifetime GetContainerLifetime()
        {
            return (container.Spec.Persistent ?? false) ? ContainerLifetime.Persistent : ContainerLifetime.Session;
        }
    }
 
    private CustomResourceSnapshot ToSnapshot(Executable executable, CustomResourceSnapshot previous)
    {
        string? projectPath = null;
        IResource? appModelResource = null;
 
        if (executable.AppModelResourceName is not null &&
            _applicationModel.TryGetValue(executable.AppModelResourceName, out appModelResource))
        {
            projectPath = appModelResource is ProjectResource p ? p.GetProjectMetadata().ProjectPath : null;
        }
 
        var state = executable.AppModelInitialState is "Hidden" ? "Hidden" : executable.Status?.State;
 
        var urls = GetUrls(executable);
 
        var environment = GetEnvironmentVariables(executable.Status?.EffectiveEnv, executable.Spec.Env);
 
        var relationships = ImmutableArray<RelationshipSnapshot>.Empty;
        if (appModelResource != null)
        {
            relationships = ResourceSnapshotBuilder.BuildRelationships(appModelResource);
        }
 
        if (projectPath is not null)
        {
            return previous with
            {
                ResourceType = KnownResourceTypes.Project,
                State = state,
                ExitCode = executable.Status?.ExitCode,
                Properties = [
                    new(KnownProperties.Executable.Path, executable.Spec.ExecutablePath),
                    new(KnownProperties.Executable.WorkDir, executable.Spec.WorkingDirectory),
                    new(KnownProperties.Executable.Args, executable.Status?.EffectiveArgs ?? []) { IsSensitive = true },
                    new(KnownProperties.Executable.Pid, executable.Status?.ProcessId),
                    new(KnownProperties.Project.Path, projectPath)
                ],
                EnvironmentVariables = environment,
                CreationTimeStamp = executable.Metadata.CreationTimestamp?.ToUniversalTime(),
                StartTimeStamp = executable.Status?.StartupTimestamp?.ToUniversalTime(),
                StopTimeStamp = executable.Status?.FinishTimestamp?.ToUniversalTime(),
                Urls = urls,
                Relationships = relationships
            };
        }
 
        return previous with
        {
            ResourceType = KnownResourceTypes.Executable,
            State = state,
            ExitCode = executable.Status?.ExitCode,
            Properties = [
                new(KnownProperties.Executable.Path, executable.Spec.ExecutablePath),
                new(KnownProperties.Executable.WorkDir, executable.Spec.WorkingDirectory),
                new(KnownProperties.Executable.Args, executable.Status?.EffectiveArgs ?? []) { IsSensitive = true },
                new(KnownProperties.Executable.Pid, executable.Status?.ProcessId)
            ],
            EnvironmentVariables = environment,
            CreationTimeStamp = executable.Metadata.CreationTimestamp?.ToUniversalTime(),
            StartTimeStamp = executable.Status?.StartupTimestamp?.ToUniversalTime(),
            StopTimeStamp = executable.Status?.FinishTimestamp?.ToUniversalTime(),
            Urls = urls,
            Relationships = relationships
        };
    }
 
    private ImmutableArray<UrlSnapshot> GetUrls(CustomResource resource)
    {
        var name = resource.Metadata.Name;
 
        var urls = ImmutableArray.CreateBuilder<UrlSnapshot>();
 
        foreach (var (_, endpoint) in _endpointsMap)
        {
            if (endpoint.Metadata.OwnerReferences?.Any(or => or.Kind == resource.Kind && or.Name == name) != true)
            {
                continue;
            }
 
            if (endpoint.Spec.ServiceName is not null &&
                _servicesMap.TryGetValue(endpoint.Spec.ServiceName, out var service) &&
                service.AppModelResourceName is string resourceName &&
                _applicationModel.TryGetValue(resourceName, out var appModelResource) &&
                appModelResource is IResourceWithEndpoints resourceWithEndpoints &&
                service.EndpointName is string endpointName)
            {
                var ep = resourceWithEndpoints.GetEndpoint(endpointName);
 
                if (ep.EndpointAnnotation.FromLaunchProfile &&
                    appModelResource is ProjectResource p &&
                    p.GetEffectiveLaunchProfile()?.LaunchProfile is LaunchProfile profile &&
                    profile.LaunchUrl is string launchUrl)
                {
                    // Concat the launch url from the launch profile to the urls with IsFromLaunchProfile set to true
 
                    string CombineUrls(string url, string launchUrl)
                    {
                        if (!launchUrl.Contains("://"))
                        {
                            // This is relative URL
                            url += $"/{launchUrl}";
                        }
                        else
                        {
                            // For absolute URL we need to update the port value if possible
                            if (profile.ApplicationUrl is string applicationUrl
                                && launchUrl.StartsWith(applicationUrl))
                            {
                                url = launchUrl.Replace(applicationUrl, url);
                            }
                        }
 
                        return url;
                    }
 
                    if (ep.IsAllocated)
                    {
                        var url = CombineUrls(ep.Url, launchUrl);
 
                        urls.Add(new(Name: ep.EndpointName, Url: url, IsInternal: false));
                    }
                }
                else
                {
                    if (ep.IsAllocated)
                    {
                        urls.Add(new(Name: ep.EndpointName, Url: ep.Url, IsInternal: false));
                    }
                }
 
                if (ep.EndpointAnnotation.IsProxied)
                {
                    var endpointString = $"{ep.Scheme}://{endpoint.Spec.Address}:{endpoint.Spec.Port}";
                    urls.Add(new(Name: $"{ep.EndpointName} target port", Url: endpointString, IsInternal: true));
                }
            }
        }
 
        return urls.ToImmutable();
    }
 
    private static ImmutableArray<VolumeSnapshot> GetVolumes(CustomResource resource)
    {
        if (resource is Container container)
        {
            return container.Spec.VolumeMounts?.Select(v => new VolumeSnapshot(v.Source, v.Target ?? "", v.Type, v.IsReadOnly)).ToImmutableArray() ?? [];
        }
 
        return [];
    }
 
    private static ImmutableArray<EnvironmentVariableSnapshot> GetEnvironmentVariables(List<EnvVar>? effectiveSource, List<EnvVar>? specSource)
    {
        if (effectiveSource is null or { Count: 0 })
        {
            return [];
        }
 
        var environment = ImmutableArray.CreateBuilder<EnvironmentVariableSnapshot>(effectiveSource.Count);
 
        foreach (var env in effectiveSource)
        {
            if (env.Name is not null)
            {
                var isFromSpec = specSource?.Any(e => string.Equals(e.Name, env.Name, StringComparison.Ordinal)) is true or null;
 
                environment.Add(new(env.Name, env.Value ?? "", isFromSpec));
            }
        }
 
        environment.Sort((v1, v2) => string.Compare(v1.Name, v2.Name, StringComparison.Ordinal));
 
        return environment.ToImmutable();
    }
 
    private static bool ProcessResourceChange<T>(ConcurrentDictionary<string, T> map, WatchEventType watchEventType, T resource)
            where T : CustomResource
    {
        switch (watchEventType)
        {
            case WatchEventType.Added:
                map.TryAdd(resource.Metadata.Name, resource);
                break;
 
            case WatchEventType.Modified:
                map[resource.Metadata.Name] = resource;
                break;
 
            case WatchEventType.Deleted:
                map.Remove(resource.Metadata.Name, out _);
                break;
 
            default:
                return false;
        }
 
        return true;
    }
 
    private async Task CreateServicesAsync(CancellationToken cancellationToken = default)
    {
        try
        {
            AspireEventSource.Instance.DcpServicesCreationStart();
 
            var needAddressAllocated = _appResources.OfType<ServiceAppResource>()
                .Where(sr => !sr.Service.HasCompleteAddress && sr.Service.Spec.AddressAllocationMode != AddressAllocationModes.Proxyless)
                .ToList();
 
            await CreateResourcesAsync<Service>(cancellationToken).ConfigureAwait(false);
 
            if (needAddressAllocated.Count == 0)
            {
                // No need to wait for any updates to Service objects from the orchestrator.
                return;
            }
 
            var withTimeout = new TimeoutStrategyOptions()
            {
                Timeout = _options.Value.ServiceStartupWatchTimeout
            };
 
            var tryTwice = new RetryStrategyOptions()
            {
                BackoffType = DelayBackoffType.Constant,
                MaxDelay = TimeSpan.FromSeconds(1),
                UseJitter = true,
                MaxRetryAttempts = 1,
                ShouldHandle = new PredicateBuilder().Handle<Exception>(),
                OnRetry = (retry) =>
                {
                    _logger.LogDebug(
                        retry.Outcome.Exception,
                        "Watching for service port allocation ended with an error after {WatchDurationMs} (iteration {Iteration})",
                        retry.Duration.TotalMilliseconds,
                        retry.AttemptNumber
                    );
                    return ValueTask.CompletedTask;
                }
            };
 
            var execution = new ResiliencePipelineBuilder().AddRetry(tryTwice).AddTimeout(withTimeout).Build();
 
            await execution.ExecuteAsync(async (attemptCancellationToken) =>
            {
                IAsyncEnumerable<(WatchEventType, Service)> serviceChangeEnumerator = kubernetesService.WatchAsync<Service>(cancellationToken: attemptCancellationToken);
                await foreach (var (evt, updated) in serviceChangeEnumerator.ConfigureAwait(false))
                {
                    if (evt == WatchEventType.Bookmark) { continue; } // Bookmarks do not contain any data.
 
                    var srvResource = needAddressAllocated.FirstOrDefault(sr => sr.Service.Metadata.Name == updated.Metadata.Name);
                    if (srvResource == null) { continue; } // This service most likely already has full address information, so it is not on needAddressAllocated list.
 
                    if (updated.HasCompleteAddress)
                    {
                        srvResource.Service.ApplyAddressInfoFrom(updated);
                        needAddressAllocated.Remove(srvResource);
                    }
 
                    if (needAddressAllocated.Count == 0)
                    {
                        return; // We are done
                    }
                }
            }, cancellationToken).ConfigureAwait(false);
 
            // If there are still services that need address allocated, try a final direct query in case the watch missed some updates.
            foreach (var sar in needAddressAllocated)
            {
                var dcpSvc = await kubernetesService.GetAsync<Service>(sar.Service.Metadata.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
                if (dcpSvc.HasCompleteAddress)
                {
                    sar.Service.ApplyAddressInfoFrom(dcpSvc);
                }
                else
                {
                    distributedApplicationLogger.LogWarning("Unable to allocate a network port for service '{ServiceName}'; service may be unreachable and its clients may not work properly.", sar.Service.Metadata.Name);
                }
            }
 
        }
        finally
        {
            AspireEventSource.Instance.DcpServicesCreationStop();
        }
    }
 
    private async Task CreateContainerNetworksAsync(CancellationToken cancellationToken)
    {
        var toCreate = _appResources.Where(r => r.DcpResource is ContainerNetwork);
        foreach (var containerNetwork in toCreate)
        {
            if (containerNetwork.DcpResource is ContainerNetwork cn)
            {
                await kubernetesService.CreateAsync(cn, cancellationToken).ConfigureAwait(false);
            }
        }
    }
 
    private async Task CreateContainersAndExecutablesAsync(CancellationToken cancellationToken)
    {
        var toCreate = _appResources.Where(r => r.DcpResource is Container || r.DcpResource is Executable || r.DcpResource is ExecutableReplicaSet);
        AddAllocatedEndpointInfo(toCreate);
 
        var afterEndpointsAllocatedEvent = new AfterEndpointsAllocatedEvent(serviceProvider, _model);
        await eventing.PublishAsync(afterEndpointsAllocatedEvent, cancellationToken).ConfigureAwait(false);
 
        foreach (var lifecycleHook in _lifecycleHooks)
        {
            await lifecycleHook.AfterEndpointsAllocatedAsync(_model, cancellationToken).ConfigureAwait(false);
        }
 
        var containersTask = CreateContainersAsync(toCreate.Where(ar => ar.DcpResource is Container), cancellationToken);
        var executablesTask = CreateExecutablesAsync(toCreate.Where(ar => ar.DcpResource is Executable || ar.DcpResource is ExecutableReplicaSet), cancellationToken);
 
        await Task.WhenAll(containersTask, executablesTask).ConfigureAwait(false);
    }
 
    private void AddAllocatedEndpointInfo(IEnumerable<AppResource> resources)
    {
        var containerHost = DefaultContainerHostName;
 
        foreach (var appResource in resources)
        {
            foreach (var sp in appResource.ServicesProduced)
            {
                var svc = (Service)sp.DcpResource;
 
                if (!svc.HasCompleteAddress && sp.EndpointAnnotation.IsProxied)
                {
                    // This should never happen; if it does, we have a bug without a workaround for th the user.
                    throw new InvalidDataException($"Service {svc.Metadata.Name} should have valid address at this point");
                }
 
                if (!sp.EndpointAnnotation.IsProxied && svc.AllocatedPort is null)
                {
                    throw new InvalidOperationException($"Service '{svc.Metadata.Name}' needs to specify a port for endpoint '{sp.EndpointAnnotation.Name}' since it isn't using a proxy.");
                }
 
                sp.EndpointAnnotation.AllocatedEndpoint = new AllocatedEndpoint(
                    sp.EndpointAnnotation,
                    "localhost",
                    (int)svc.AllocatedPort!,
                    containerHostAddress: appResource.ModelResource.IsContainer() ? containerHost : null,
                    targetPortExpression: $$$"""{{- portForServing "{{{svc.Metadata.Name}}}" -}}""");
            }
        }
    }
 
    private void PrepareServices()
    {
        var serviceProducers = _model.Resources
            .Select(r => (ModelResource: r, Endpoints: r.Annotations.OfType<EndpointAnnotation>()))
            .Where(sp => sp.Endpoints.Any());
 
        // We need to ensure that Services have unique names (otherwise we cannot really distinguish between
        // services produced by different resources).
        HashSet<string> serviceNames = [];
 
        foreach (var sp in serviceProducers)
        {
            var endpoints = sp.Endpoints.ToArray();
 
            foreach (var endpoint in endpoints)
            {
                var candidateServiceName = endpoints.Length == 1
                    ? GetObjectNameForResource(sp.ModelResource)
                    : GetObjectNameForResource(sp.ModelResource, endpoint.Name);
 
                var uniqueServiceName = GenerateUniqueServiceName(serviceNames, candidateServiceName);
                var svc = Service.Create(uniqueServiceName);
 
                var port = _options.Value.RandomizePorts && endpoint.IsProxied ? null : endpoint.Port;
                svc.Spec.Port = port;
                svc.Spec.Protocol = PortProtocol.FromProtocolType(endpoint.Protocol);
                svc.Spec.Address = endpoint.TargetHost switch
                {
                    "*" or "+" => "0.0.0.0",
                    _ => endpoint.TargetHost
                };
                svc.Spec.AddressAllocationMode = endpoint.IsProxied ? AddressAllocationModes.Localhost : AddressAllocationModes.Proxyless;
 
                // So we can associate the service with the resource that produced it and the endpoint it represents.
                svc.Annotate(CustomResource.ResourceNameAnnotation, sp.ModelResource.Name);
                svc.Annotate(CustomResource.EndpointNameAnnotation, endpoint.Name);
 
                _appResources.Add(new ServiceAppResource(sp.ModelResource, svc, endpoint));
            }
        }
    }
 
    private void PrepareExecutables()
    {
        PrepareProjectExecutables();
        PreparePlainExecutables();
    }
 
    private void PreparePlainExecutables()
    {
        var modelExecutableResources = _model.GetExecutableResources();
 
        foreach (var executable in modelExecutableResources)
        {
            EnsureRequiredAnnotations(executable);
 
            var exeInstance = GetDcpInstance(executable, instanceIndex: 0);
            var exePath = executable.Command;
            var exe = Executable.Create(exeInstance.Name, exePath);
 
            // The working directory is always relative to the app host project directory (if it exists).
            exe.Spec.WorkingDirectory = executable.WorkingDirectory;
            exe.Spec.ExecutionType = ExecutionType.Process;
            exe.Annotate(CustomResource.OtelServiceNameAnnotation, executable.Name);
            exe.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, exeInstance.Suffix);
            exe.Annotate(CustomResource.ResourceNameAnnotation, executable.Name);
            SetInitialResourceState(executable, exe);
 
            var exeAppResource = new AppResource(executable, exe);
            AddServicesProducedInfo(executable, exe, exeAppResource);
            _appResources.Add(exeAppResource);
        }
    }
 
    private void PrepareProjectExecutables()
    {
        var modelProjectResources = _model.GetProjectResources();
 
        foreach (var project in modelProjectResources)
        {
            if (!project.TryGetLastAnnotation<IProjectMetadata>(out var projectMetadata))
            {
                throw new InvalidOperationException("A project resource is missing required metadata"); // Should never happen.
            }
 
            EnsureRequiredAnnotations(project);
 
            var replicas = project.GetReplicaCount();
 
            for (var i = 0; i < replicas; i++)
            {
                var exeInstance = GetDcpInstance(project, instanceIndex: i);
                var exeSpec = Executable.Create(exeInstance.Name, "dotnet");
                exeSpec.Spec.WorkingDirectory = Path.GetDirectoryName(projectMetadata.ProjectPath);
 
                exeSpec.Annotate(CustomResource.OtelServiceNameAnnotation, project.Name);
                exeSpec.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, exeInstance.Suffix);
                exeSpec.Annotate(CustomResource.ResourceNameAnnotation, project.Name);
                exeSpec.Annotate(CustomResource.ResourceReplicaCount, replicas.ToString(CultureInfo.InvariantCulture));
                exeSpec.Annotate(CustomResource.ResourceReplicaIndex, i.ToString(CultureInfo.InvariantCulture));
 
                SetInitialResourceState(project, exeSpec);
 
                var projectLaunchConfiguration = new ProjectLaunchConfiguration();
                projectLaunchConfiguration.ProjectPath = projectMetadata.ProjectPath;
 
                if (!string.IsNullOrEmpty(configuration[DebugSessionPortVar]))
                {
                    exeSpec.Spec.ExecutionType = ExecutionType.IDE;
 
                    projectLaunchConfiguration.DisableLaunchProfile = project.TryGetLastAnnotation<ExcludeLaunchProfileAnnotation>(out _);
                    if (!projectLaunchConfiguration.DisableLaunchProfile && project.TryGetLastAnnotation<LaunchProfileAnnotation>(out var lpa))
                    {
                        projectLaunchConfiguration.LaunchProfile = lpa.LaunchProfileName;
                    }
                }
                else
                {
                    exeSpec.Spec.ExecutionType = ExecutionType.Process;
                    if (configuration.GetBool("DOTNET_WATCH") is not true)
                    {
                        exeSpec.Spec.Args = [
                            "run",
                        "--no-build",
                        "--project",
                        projectMetadata.ProjectPath,
                    ];
                    }
                    else
                    {
                        exeSpec.Spec.Args = [
                            "watch",
                        "--non-interactive",
                        "--no-hot-reload",
                        "--project",
                        projectMetadata.ProjectPath
                        ];
                    }
 
                    if (!string.IsNullOrEmpty(_distributedApplicationOptions.Configuration))
                    {
                        exeSpec.Spec.Args.AddRange(new[] { "-c", _distributedApplicationOptions.Configuration });
                    }
 
                    // We pretty much always want to suppress the normal launch profile handling
                    // because the settings from the profile will override the ambient environment settings, which is not what we want
                    // (the ambient environment settings for service processes come from the application model
                    // and should be HIGHER priority than the launch profile settings).
                    // This means we need to apply the launch profile settings manually--the invocation parameters here,
                    // and the environment variables/application URLs inside CreateExecutableAsync().
                    exeSpec.Spec.Args.Add("--no-launch-profile");
 
                    var launchProfile = project.GetEffectiveLaunchProfile()?.LaunchProfile;
                    if (launchProfile is not null && !string.IsNullOrWhiteSpace(launchProfile.CommandLineArgs))
                    {
                        var cmdArgs = CommandLineArgsParser.Parse(launchProfile.CommandLineArgs);
                        if (cmdArgs.Count > 0)
                        {
                            exeSpec.Spec.Args.Add("--");
                            exeSpec.Spec.Args.AddRange(cmdArgs);
                        }
                    }
                }
 
                // We want this annotation even if we are not using IDE execution; see ToSnapshot() for details.
                exeSpec.AnnotateAsObjectList(Executable.LaunchConfigurationsAnnotation, projectLaunchConfiguration);
 
                var exeAppResource = new AppResource(project, exeSpec);
                AddServicesProducedInfo(project, exeSpec, exeAppResource);
                _appResources.Add(exeAppResource);
            }
        }
    }
 
    private void EnsureRequiredAnnotations(IResource resource)
    {
        // Add the default lifecycle commands (start/stop/restart)
        resource.AddLifeCycleCommands();
 
        nameGenerator.EnsureDcpInstancesPopulated(resource);
    }
 
    private static void SetInitialResourceState(IResource resource, IAnnotationHolder annotationHolder)
    {
        // Store the initial state of the resource
        if (resource.TryGetLastAnnotation<ResourceSnapshotAnnotation>(out var initial) &&
            initial.InitialSnapshot.State?.Text is string state && !string.IsNullOrEmpty(state))
        {
            annotationHolder.Annotate(CustomResource.ResourceStateAnnotation, state);
        }
    }
 
    private Task CreateExecutablesAsync(IEnumerable<AppResource> executableResources, CancellationToken cancellationToken)
    {
        try
        {
            AspireEventSource.Instance.DcpExecutablesCreateStart();
 
            async Task CreateResourceExecutablesAsyncCore(IResource resource, IEnumerable<AppResource> executables, CancellationToken cancellationToken)
            {
                var resourceLogger = loggerService.GetLogger(resource);
 
                try
                {
                    await notificationService.PublishUpdateAsync(resource, s => s with
                    {
                        ResourceType = resource is ProjectResource ? KnownResourceTypes.Project : KnownResourceTypes.Executable,
                        Properties = [],
                        State = "Starting"
                    })
                    .ConfigureAwait(false);
 
                    await PublishConnectionStringAvailableEvent(resource, cancellationToken).ConfigureAwait(false);
 
                    var beforeResourceStartedEvent = new BeforeResourceStartedEvent(resource, serviceProvider);
                    await eventing.PublishAsync(beforeResourceStartedEvent, cancellationToken).ConfigureAwait(false);
 
                    foreach (var er in executables)
                    {
                        try
                        {
                            await CreateExecutableAsync(er, resourceLogger, cancellationToken).ConfigureAwait(false);
                        }
                        catch (FailedToApplyEnvironmentException)
                        {
                            // For this exception we don't want the noise of the stack trace, we've already
                            // provided more detail where we detected the issue (e.g. envvar name). To get
                            // more diagnostic information reduce logging level for DCP log category to Debug.
                            await notificationService.PublishUpdateAsync(er.ModelResource, er.DcpResource.Metadata.Name, s => s with { State = "FailedToStart" }).ConfigureAwait(false);
                        }
                        catch (Exception ex)
                        {
                            // The purpose of this catch block is to ensure that if an individual executable resource fails
                            // to start that it doesn't tear down the entire app host AND that we route the error to the
                            // appropriate replica.
                            resourceLogger.LogError(ex, "Failed to create resource {ResourceName}", er.ModelResource.Name);
                            await notificationService.PublishUpdateAsync(er.ModelResource, er.DcpResource.Metadata.Name, s => s with { State = "FailedToStart" }).ConfigureAwait(false);
                        }
                    }
                }
                catch (Exception ex)
                {
                    // The purpose of this catch block is to ensure that if an error processing the overall
                    // configuration of the executable resource files. This is different to the exception handling
                    // block above because at this tage of processing we don't necessarily have any replicas
                    // yet. For example if a dependency fails to start.
                    resourceLogger.LogError(ex, "Failed to create resource {ResourceName}", resource.Name);
                    await notificationService.PublishUpdateAsync(resource, s => s with { State = "FailedToStart" }).ConfigureAwait(false);
                }
            }
 
            var tasks = new List<Task>();
            foreach (var group in executableResources.GroupBy(e => e.ModelResource))
            {
                tasks.Add(CreateResourceExecutablesAsyncCore(group.Key, group, cancellationToken));
            }
 
            return Task.WhenAll(tasks);
        }
        finally
        {
            AspireEventSource.Instance.DcpExecutablesCreateStop();
        }
    }
 
    private async Task PublishConnectionStringAvailableEvent(IResource resource, CancellationToken cancellationToken)
    {
        // If the resource itself has a connection string then publish that the connection string is available.
        if (resource is IResourceWithConnectionString)
        {
            var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(resource, serviceProvider);
            await eventing.PublishAsync(connectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
        }
 
        // Sometimes the container/executable itself does not have a connection string, and in those cases
        // we need to dispatch the event for the children.
        if (_parentChildLookup[resource] is { } children)
        {
            foreach (var child in children.OfType<IResourceWithConnectionString>())
            {
                var childConnectionStringAvailableEvent = new ConnectionStringAvailableEvent(child, serviceProvider);
                await eventing.PublishAsync(childConnectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
            }
        }
    }
 
    private async Task CreateExecutableAsync(AppResource er, ILogger resourceLogger, CancellationToken cancellationToken)
    {
        ExecutableSpec spec;
        Func<Task<CustomResource>> createResource;
 
        switch (er.DcpResource)
        {
            case Executable exe:
                spec = exe.Spec;
                createResource = async () => await kubernetesService.CreateAsync(exe, cancellationToken).ConfigureAwait(false);
                break;
            case ExecutableReplicaSet ers:
                spec = ers.Spec.Template.Spec;
                createResource = async () => await kubernetesService.CreateAsync(ers, cancellationToken).ConfigureAwait(false);
                break;
            default:
                throw new InvalidOperationException($"Expected an Executable-like resource, but got {er.DcpResource.Kind} instead");
        }
 
        bool failedToApplyArgs = false;
        spec.Args ??= [];
 
        if (er.ModelResource.TryGetAnnotationsOfType<CommandLineArgsCallbackAnnotation>(out var exeArgsCallbacks))
        {
            var args = new List<object>();
            var commandLineContext = new CommandLineArgsCallbackContext(args, cancellationToken);
 
            foreach (var exeArgsCallback in exeArgsCallbacks)
            {
                await exeArgsCallback.Callback(commandLineContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                try
                {
                    var value = arg switch
                    {
                        string s => s,
                        IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: false, cancellationToken).ConfigureAwait(false),
                        null => null,
                        _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                    };
 
                    if (value is not null)
                    {
                        spec.Args.Add(value);
                    }
                }
                catch (Exception ex)
                {
                    resourceLogger.LogCritical("Failed to apply arguments. A dependency may have failed to start.");
                    _logger.LogDebug(ex, "Failed to apply arguments. A dependency may have failed to start.");
                    failedToApplyArgs = true;
                }
            }
        }
 
        var config = new Dictionary<string, object>();
        var context = new EnvironmentCallbackContext(_executionContext, config, cancellationToken)
        {
            Logger = resourceLogger
        };
 
        if (er.ModelResource.TryGetEnvironmentVariables(out var envVarAnnotations))
        {
            foreach (var ann in envVarAnnotations)
            {
                await ann.Callback(context).ConfigureAwait(false);
            }
        }
 
        bool failedToApplyConfiguration = false;
        spec.Env = [];
        foreach (var c in config)
        {
            try
            {
                var value = c.Value switch
                {
                    string s => s,
                    IValueProvider valueProvider => await GetValue(c.Key, valueProvider, resourceLogger, isContainer: false, cancellationToken).ConfigureAwait(false),
                    null => null,
                    _ => throw new InvalidOperationException($"Unexpected value for environment variable \"{c.Key}\".")
                };
 
                if (value is not null)
                {
                    spec.Env.Add(new EnvVar { Name = c.Key, Value = value });
                }
            }
            catch (Exception ex)
            {
                resourceLogger.LogCritical("Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", c.Key);
                _logger.LogDebug(ex, "Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", c.Key);
                failedToApplyConfiguration = true;
            }
        }
 
        if (failedToApplyConfiguration || failedToApplyArgs)
        {
            throw new FailedToApplyEnvironmentException();
        }
 
        await createResource().ConfigureAwait(false);
    }
 
    private async Task<string?> GetValue(string? key, IValueProvider valueProvider, ILogger logger, bool isContainer, CancellationToken cancellationToken)
    {
        var task = ExpressionResolver.ResolveAsync(isContainer, valueProvider, DefaultContainerHostName, cancellationToken);
 
        if (!task.IsCompleted)
        {
            if (valueProvider is IResource resource)
            {
                if (key is null)
                {
                    logger.LogInformation("Waiting for value from resource '{ResourceName}'", resource.Name);
                }
                else
                {
                    logger.LogInformation("Waiting for value for environment variable value '{Name}' from resource '{ResourceName}'", key, resource.Name);
                }
            }
            else if (valueProvider is ConnectionStringReference { Resource: var cs })
            {
                logger.LogInformation("Waiting for value for connection string from resource '{ResourceName}'", cs.Name);
            }
            else
            {
                if (key is null)
                {
                    logger.LogInformation("Waiting for value from {ValueProvider}.", valueProvider.ToString());
                }
                else
                {
                    logger.LogInformation("Waiting for value for environment variable value '{Name}' from {ValueProvider}.", key, valueProvider.ToString());
                }
            }
        }
 
        return await task.ConfigureAwait(false);
    }
 
    private void PrepareContainers()
    {
        var modelContainerResources = _model.GetContainerResources();
 
        foreach (var container in modelContainerResources)
        {
            if (!container.TryGetContainerImageName(out var containerImageName))
            {
                // This should never happen! In order to get into this loop we need
                // to have the annotation, if we don't have the annotation by the time
                // we get here someone is doing something wrong.
                throw new InvalidOperationException();
            }
 
            EnsureRequiredAnnotations(container);
 
            var containerObjectInstance = GetDcpInstance(container, instanceIndex: 0);
            var ctr = Container.Create(containerObjectInstance.Name, containerImageName);
 
            ctr.Spec.ContainerName = containerObjectInstance.Name; // Use the same name for container orchestrator (Docker, Podman) resource and DCP object name.
 
            if (container.GetContainerLifetimeType() == ContainerLifetime.Persistent)
            {
                ctr.Spec.Persistent = true;
            }
 
            ctr.Annotate(CustomResource.ResourceNameAnnotation, container.Name);
            ctr.Annotate(CustomResource.OtelServiceNameAnnotation, container.Name);
            ctr.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, containerObjectInstance.Suffix);
            SetInitialResourceState(container, ctr);
 
            if (container.TryGetContainerMounts(out var containerMounts))
            {
                ctr.Spec.VolumeMounts = [];
 
                foreach (var mount in containerMounts)
                {
                    var volumeSpec = new VolumeMount
                    {
                        Source = mount.Source,
                        Target = mount.Target,
                        Type = mount.Type == ContainerMountType.BindMount ? VolumeMountType.Bind : VolumeMountType.Volume,
                        IsReadOnly = mount.IsReadOnly
                    };
 
                    ctr.Spec.VolumeMounts.Add(volumeSpec);
                }
            }
 
            ctr.Spec.Networks = new List<ContainerNetworkConnection>
            {
                new ContainerNetworkConnection
                {
                    Name = DefaultAspireNetworkName,
                    Aliases = new List<string> { container.Name },
                }
            };
 
            var containerAppResource = new AppResource(container, ctr);
            AddServicesProducedInfo(container, ctr, containerAppResource);
            _appResources.Add(containerAppResource);
        }
    }
 
    /// <summary>
    /// Gets information about the resource's DCP instance. ReplicaInstancesAnnotation is added in BeforeStartEvent.
    /// </summary>
    private static DcpInstance GetDcpInstance(IResource resource, int instanceIndex)
    {
        if (!resource.TryGetLastAnnotation<DcpInstancesAnnotation>(out var replicaAnnotation))
        {
            throw new DistributedApplicationException($"Couldn't find required {nameof(DcpInstancesAnnotation)} annotation on resource {resource.Name}.");
        }
 
        foreach (var instance in replicaAnnotation.Instances)
        {
            if (instance.Index == instanceIndex)
            {
                return instance;
            }
        }
 
        throw new DistributedApplicationException($"Couldn't find required instance ID for index {instanceIndex} on resource {resource.Name}.");
    }
 
    private Task CreateContainersAsync(IEnumerable<AppResource> containerResources, CancellationToken cancellationToken)
    {
        try
        {
            AspireEventSource.Instance.DcpContainersCreateStart();
 
            async Task CreateContainerAsyncCore(AppResource cr, CancellationToken cancellationToken)
            {
                var logger = loggerService.GetLogger(cr.ModelResource);
 
                await notificationService.PublishUpdateAsync(cr.ModelResource, s => s with
                {
                    State = "Starting",
                    Properties = [
                        new(KnownProperties.Container.Image, cr.ModelResource.TryGetContainerImageName(out var imageName) ? imageName : ""),
                    ],
                    ResourceType = KnownResourceTypes.Container
                })
                .ConfigureAwait(false);
 
                await SetChildResourceAsync(cr.ModelResource, cr.DcpResource.Metadata.Name, state: "Starting", startTimeStamp: null, stopTimeStamp: null).ConfigureAwait(false);
 
                try
                {
                    await CreateContainerAsync(cr, logger, cancellationToken).ConfigureAwait(false);
                }
                catch (FailedToApplyEnvironmentException)
                {
                    // For this exception we don't want the noise of the stack trace, we've already
                    // provided more detail where we detected the issue (e.g. envvar name). To get
                    // more diagnostic information reduce logging level for DCP log category to Debug.
                    await notificationService.PublishUpdateAsync(cr.ModelResource, s => s with { State = "FailedToStart" }).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "Failed to create container resource {ResourceName}", cr.ModelResource.Name);
 
                    await notificationService.PublishUpdateAsync(cr.ModelResource, s => s with { State = "FailedToStart" }).ConfigureAwait(false);
 
                    await SetChildResourceAsync(cr.ModelResource, cr.DcpResource.Metadata.Name, state: "FailedToStart", startTimeStamp: null, stopTimeStamp: null).ConfigureAwait(false);
                }
            }
 
            var tasks = new List<Task>();
 
            // Create a custom container network for Aspire if there are container resources
            if (containerResources.Any())
            {
                // The network will be created with a unique postfix to avoid conflicts with other Aspire AppHost networks
                tasks.Add(kubernetesService.CreateAsync(ContainerNetwork.Create(DefaultAspireNetworkName), cancellationToken));
            }
 
            foreach (var cr in containerResources)
            {
                tasks.Add(CreateContainerAsyncCore(cr, cancellationToken));
            }
 
            return Task.WhenAll(tasks);
        }
        finally
        {
            AspireEventSource.Instance.DcpContainersCreateStop();
        }
    }
 
    private async Task CreateContainerAsync(AppResource cr, ILogger resourceLogger, CancellationToken cancellationToken)
    {
        await PublishConnectionStringAvailableEvent(cr.ModelResource, cancellationToken).ConfigureAwait(false);
 
        var beforeResourceStartedEvent = new BeforeResourceStartedEvent(cr.ModelResource, serviceProvider);
        await eventing.PublishAsync(beforeResourceStartedEvent, cancellationToken).ConfigureAwait(false);
 
        var dcpContainerResource = (Container)cr.DcpResource;
        var modelContainerResource = cr.ModelResource;
 
        await ApplyBuildArgumentsAsync(dcpContainerResource, modelContainerResource, cancellationToken).ConfigureAwait(false);
 
        var config = new Dictionary<string, object>();
 
        dcpContainerResource.Spec.Env = [];
 
        if (cr.ServicesProduced.Count > 0)
        {
            dcpContainerResource.Spec.Ports = new();
 
            foreach (var sp in cr.ServicesProduced)
            {
                var ea = sp.EndpointAnnotation;
 
                var portSpec = new ContainerPortSpec()
                {
                    ContainerPort = ea.TargetPort,
                };
 
                if (!ea.IsProxied && ea.Port is int)
                {
                    portSpec.HostPort = ea.Port;
                }
 
                switch (sp.EndpointAnnotation.Protocol)
                {
                    case ProtocolType.Tcp:
                        portSpec.Protocol = PortProtocol.TCP; break;
                    case ProtocolType.Udp:
                        portSpec.Protocol = PortProtocol.UDP; break;
                }
 
                dcpContainerResource.Spec.Ports.Add(portSpec);
            }
        }
 
        if (modelContainerResource.TryGetEnvironmentVariables(out var containerEnvironmentVariables))
        {
            var context = new EnvironmentCallbackContext(_executionContext, config, cancellationToken);
 
            foreach (var v in containerEnvironmentVariables)
            {
                await v.Callback(context).ConfigureAwait(false);
            }
        }
 
        bool failedToApplyConfiguration = false;
        foreach (var kvp in config)
        {
            try
            {
                var value = kvp.Value switch
                {
                    string s => s,
                    IValueProvider valueProvider => await GetValue(kvp.Key, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                    null => null,
                    _ => throw new InvalidOperationException($"Unexpected value for environment variable \"{kvp.Key}\".")
                };
 
                if (value is not null)
                {
                    dcpContainerResource.Spec.Env.Add(new EnvVar { Name = kvp.Key, Value = value });
                }
            }
            catch (Exception ex)
            {
                resourceLogger.LogCritical("Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", kvp.Key);
                _logger.LogDebug(ex, "Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", kvp.Key);
                failedToApplyConfiguration = true;
            }
        }
 
        // Apply optional extra arguments to the container run command.
        if (modelContainerResource.TryGetAnnotationsOfType<ContainerRuntimeArgsCallbackAnnotation>(out var runArgsCallback))
        {
            dcpContainerResource.Spec.RunArgs ??= [];
 
            var args = new List<object>();
 
            var containerRunArgsContext = new ContainerRuntimeArgsCallbackContext(args, cancellationToken);
 
            foreach (var callback in runArgsCallback)
            {
                await callback.Callback(containerRunArgsContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                var value = arg switch
                {
                    string s => s,
                    IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                    null => null,
                    _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                };
 
                if (value is not null)
                {
                    dcpContainerResource.Spec.RunArgs.Add(value);
                }
            }
        }
 
        var failedToApplyArgs = false;
        if (modelContainerResource.TryGetAnnotationsOfType<CommandLineArgsCallbackAnnotation>(out var argsCallback))
        {
            dcpContainerResource.Spec.Args ??= [];
 
            var args = new List<object>();
 
            var commandLineArgsContext = new CommandLineArgsCallbackContext(args, cancellationToken);
 
            foreach (var callback in argsCallback)
            {
                await callback.Callback(commandLineArgsContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                try
                {
                    var value = arg switch
                    {
                        string s => s,
                        IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                        null => null,
                        _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                    };
 
                    if (value is not null)
                    {
                        dcpContainerResource.Spec.Args.Add(value);
                    }
                }
                catch (Exception ex)
                {
                    resourceLogger.LogCritical("Failed to apply container arguments '{ConfigKey}'. A dependency may have failed to start.", arg);
                    _logger.LogDebug(ex, "Failed to apply container arguments '{ConfigKey}'. A dependency may have failed to start.", arg);
                    failedToApplyArgs = true;
                }
            }
        }
 
        if (modelContainerResource is ContainerResource containerResource)
        {
            dcpContainerResource.Spec.Command = containerResource.Entrypoint;
        }
 
        if (failedToApplyArgs || failedToApplyConfiguration)
        {
            throw new FailedToApplyEnvironmentException();
        }
 
        if (_dcpInfo is not null)
        {
            DcpDependencyCheck.CheckDcpInfoAndLogErrors(resourceLogger, _options.Value, _dcpInfo);
        }
 
        await kubernetesService.CreateAsync(dcpContainerResource, cancellationToken).ConfigureAwait(false);
    }
 
    private static async Task ApplyBuildArgumentsAsync(Container dcpContainerResource, IResource modelContainerResource, CancellationToken cancellationToken)
    {
        if (modelContainerResource.Annotations.OfType<DockerfileBuildAnnotation>().SingleOrDefault() is { } dockerfileBuildAnnotation)
        {
            var dcpBuildArgs = new List<EnvVar>();
 
            foreach (var buildArgument in dockerfileBuildAnnotation.BuildArguments)
            {
                var valueString = buildArgument.Value switch
                {
                    string stringValue => stringValue,
                    IValueProvider valueProvider => await valueProvider.GetValueAsync(cancellationToken).ConfigureAwait(false),
                    bool boolValue => boolValue ? "true" : "false",
                    _ => buildArgument.Value.ToString()
                };
 
                var dcpBuildArg = new EnvVar()
                {
                    Name = buildArgument.Key,
                    Value = valueString
                };
 
                dcpBuildArgs.Add(dcpBuildArg);
            }
 
            dcpContainerResource.Spec.Build = new()
            {
                Context = dockerfileBuildAnnotation.ContextPath,
                Dockerfile = dockerfileBuildAnnotation.DockerfilePath,
                Stage = dockerfileBuildAnnotation.Stage,
                Args = dcpBuildArgs
            };
 
            var dcpBuildSecrets = new List<BuildContextSecret>();
 
            foreach (var buildSecret in dockerfileBuildAnnotation.BuildSecrets)
            {
                var valueString = buildSecret.Value switch
                {
                    FileInfo filePath => filePath.FullName,
                    IValueProvider valueProvider => await valueProvider.GetValueAsync(cancellationToken).ConfigureAwait(false),
                    _ => throw new InvalidOperationException("Build secret can only be a parameter or a file.")
                };
 
                if (buildSecret.Value is FileInfo)
                {
                    var dcpBuildSecret = new BuildContextSecret
                    {
                        Id = buildSecret.Key,
                        Type = "file",
                        Source = valueString
                    };
                    dcpBuildSecrets.Add(dcpBuildSecret);
                }
                else
                {
                    var dcpBuildSecret = new BuildContextSecret
                    {
                        Id = buildSecret.Key,
                        Type = "env",
                        Value = valueString
                    };
                    dcpBuildSecrets.Add(dcpBuildSecret);
                }
            }
 
            dcpContainerResource.Spec.Build = new()
            {
                Context = dockerfileBuildAnnotation.ContextPath,
                Dockerfile = dockerfileBuildAnnotation.DockerfilePath,
                Stage = dockerfileBuildAnnotation.Stage,
                Args = dcpBuildArgs,
                Secrets = dcpBuildSecrets
            };
        }
    }
 
    private void AddServicesProducedInfo(IResource modelResource, IAnnotationHolder dcpResource, AppResource appResource)
    {
        string modelResourceName = "(unknown)";
        try
        {
            modelResourceName = GetObjectNameForResource(modelResource);
        }
        catch { } // For error messages only, OK to fall back to (unknown)
 
        var servicesProduced = _appResources.OfType<ServiceAppResource>().Where(r => r.ModelResource == modelResource);
        foreach (var sp in servicesProduced)
        {
            var ea = sp.EndpointAnnotation;
 
            if (modelResource.IsContainer())
            {
                if (ea.TargetPort is null)
                {
                    throw new InvalidOperationException($"The endpoint '{ea.Name}' for container resource '{modelResourceName}' must specify the {nameof(EndpointAnnotation.TargetPort)} value");
                }
            }
            else if (!ea.IsProxied)
            {
                if (HasMultipleReplicas(appResource.DcpResource))
                {
                    throw new InvalidOperationException($"Resource '{modelResourceName}' uses multiple replicas and a proxy-less endpoint '{ea.Name}'. These features do not work together.");
                }
 
                if (ea.Port is int && ea.Port != ea.TargetPort)
                {
                    throw new InvalidOperationException($"The endpoint '{ea.Name}' for resource '{modelResourceName}' is not using a proxy, and it has a value of {nameof(EndpointAnnotation.Port)} property that is different from the value of {nameof(EndpointAnnotation.TargetPort)} property. For proxy-less endpoints they must match.");
                }
            }
            else
            {
                Debug.Assert(ea.IsProxied);
 
                if (ea.TargetPort is int && ea.Port is int && ea.TargetPort == ea.Port)
                {
                    throw new InvalidOperationException(
                        $"The endpoint '{ea.Name}' for resource '{modelResourceName}' requested a proxy ({nameof(ea.IsProxied)} is true). Non-container resources cannot be proxied when both {nameof(ea.TargetPort)} and {nameof(ea.Port)} are specified with the same value.");
                }
 
                if (HasMultipleReplicas(appResource.DcpResource) && ea.TargetPort is int)
                {
                    throw new InvalidOperationException(
                        $"Resource '{modelResourceName}' can have multiple replicas, and it uses endpoint '{ea.Name}' that has {nameof(ea.TargetPort)} property set. Each replica must have a unique port; setting {nameof(ea.TargetPort)} is not allowed.");
                }
            }
 
            var spAnn = new ServiceProducerAnnotation(sp.Service.Metadata.Name);
            spAnn.Port = ea.TargetPort;
            dcpResource.AnnotateAsObjectList(CustomResource.ServiceProducerAnnotation, spAnn);
            appResource.ServicesProduced.Add(sp);
        }
 
        static bool HasMultipleReplicas(CustomResource resource)
        {
            if (resource is ExecutableReplicaSet ers && ers.Spec.Replicas > 1)
            {
                return true;
            }
            if (resource is Executable exe && exe.Metadata.Annotations.TryGetValue(CustomResource.ResourceReplicaCount, out var value) && int.TryParse(value, CultureInfo.InvariantCulture, out var replicas) && replicas > 1)
            {
                return true;
            }
            return false;
        }
    }
 
    private async Task CreateResourcesAsync<RT>(CancellationToken cancellationToken) where RT : CustomResource
    {
        try
        {
            var resourcesToCreate = _appResources.Select(r => r.DcpResource).OfType<RT>();
            if (!resourcesToCreate.Any())
            {
                return;
            }
 
            // CONSIDER batched creation
            foreach (var res in resourcesToCreate)
            {
                await kubernetesService.CreateAsync(res, cancellationToken).ConfigureAwait(false);
            }
        }
        catch (OperationCanceledException ex)
        {
            // We catch and suppress the OperationCancelledException because the user may CTRL-C
            // during start up of the resources.
            _logger.LogDebug(ex, "Cancellation during creation of resources.");
        }
    }
 
    private string GetObjectNameForResource(IResource resource, string suffix = "")
    {
        if (resource.TryGetLastAnnotation<ContainerNameAnnotation>(out var containerNameAnnotation))
        {
            // If an explicit container name is provided, use it without any postfix
            return containerNameAnnotation.Name;
        }
 
        static string maybeWithSuffix(string s, string localSuffix, string? globalSuffix)
            => (string.IsNullOrWhiteSpace(localSuffix), string.IsNullOrWhiteSpace(globalSuffix)) switch
            {
                (true, true) => s,
                (false, true) => $"{s}-{localSuffix}",
                (true, false) => $"{s}-{globalSuffix}",
                (false, false) => $"{s}-{localSuffix}-{globalSuffix}"
            };
        return maybeWithSuffix(resource.Name, suffix, _options.Value.ResourceNameSuffix);
    }
 
    private static string GenerateUniqueServiceName(HashSet<string> serviceNames, string candidateName)
    {
        int suffix = 1;
        string uniqueName = candidateName;
 
        while (!serviceNames.Add(uniqueName))
        {
            uniqueName = $"{candidateName}-{suffix}";
            suffix++;
            if (suffix == 100)
            {
                // Should never happen, but we do not want to ever get into a infinite loop situation either.
                throw new ArgumentException($"Could not generate a unique name for service '{candidateName}'");
            }
        }
 
        return uniqueName;
    }
 
    private static string GetRandomNameSuffix()
    {
        // RandomNameSuffixLength of lowercase characters
        var suffix = PasswordGenerator.Generate(RandomNameSuffixLength, true, false, false, false, RandomNameSuffixLength, 0, 0, 0);
        return suffix;
    }
 
    public async Task DeleteResourcesAsync(CancellationToken cancellationToken = default)
    {
        try
        {
            AspireEventSource.Instance.DcpModelCleanupStart();
            await DeleteResourcesAsync<ExecutableReplicaSet>("project", cancellationToken).ConfigureAwait(false);
            await DeleteResourcesAsync<Executable>("project", cancellationToken).ConfigureAwait(false);
            await DeleteResourcesAsync<Container>("container", cancellationToken).ConfigureAwait(false);
            await DeleteResourcesAsync<Service>("service", cancellationToken).ConfigureAwait(false);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            // Expected
            _logger.LogDebug("Cancellation received while deleting resources.");
        }
        finally
        {
            AspireEventSource.Instance.DcpModelCleanupStop();
            _appResources.Clear();
        }
    }
 
    private async Task DeleteResourcesAsync<TResource>(string resourceType, CancellationToken cancellationToken) where TResource : CustomResource
    {
        var resourcesToDelete = _appResources.Select(r => r.DcpResource).OfType<TResource>();
        if (!resourcesToDelete.Any())
        {
            return;
        }
 
        foreach (var res in resourcesToDelete)
        {
            try
            {
                await kubernetesService.DeleteAsync<TResource>(res.Metadata.Name, res.Metadata.NamespaceProperty, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                _logger.LogInformation(ex, "Could not stop {ResourceType} '{ResourceName}'.", resourceType, res.Metadata.Name);
            }
        }
    }
 
    /// <summary>
    /// Create a patch update using the specified resource.
    /// A copy is taken of the resource to avoid permanently changing it.
    /// </summary>
    internal static V1Patch CreatePatch<T>(T obj, Action<T> change) where T : CustomResource
    {
        // This method isn't very efficient.
        // If mass or frequent patches are required then we may want to create patches manually.
        var current = JsonSerializer.SerializeToNode(obj);
 
        var copy = JsonSerializer.Deserialize<T>(current)!;
        change(copy);
 
        var changed = JsonSerializer.SerializeToNode(copy);
 
        var jsonPatch = current.CreatePatch(changed);
        return new V1Patch(jsonPatch, V1Patch.PatchType.JsonPatch);
    }
 
    internal async Task StopResourceAsync(string resourceName, CancellationToken cancellationToken)
    {
        var matchingResource = GetMatchingResource(resourceName);
 
        V1Patch patch;
        switch (matchingResource.DcpResource)
        {
            case Container c:
                patch = CreatePatch(c, obj => obj.Spec.Stop = true);
                await kubernetesService.PatchAsync(c, patch, cancellationToken).ConfigureAwait(false);
                break;
            case Executable e:
                patch = CreatePatch(e, obj => obj.Spec.Stop = true);
                await kubernetesService.PatchAsync(e, patch, cancellationToken).ConfigureAwait(false);
                break;
            case ExecutableReplicaSet rs:
                patch = CreatePatch(rs, obj => obj.Spec.Replicas = 0);
                await kubernetesService.PatchAsync(rs, patch, cancellationToken).ConfigureAwait(false);
                break;
            default:
                throw new InvalidOperationException($"Unexpected resource type: {matchingResource.DcpResource.GetType().FullName}");
        }
    }
 
    private AppResource GetMatchingResource(string resourceName)
    {
        var matchingResource = _appResources
            .Where(r => r.DcpResource is not Service)
            .SingleOrDefault(r => string.Equals(r.DcpResource.Metadata.Name, resourceName, StringComparisons.ResourceName));
        if (matchingResource == null)
        {
            throw new InvalidOperationException($"Resource '{resourceName}' not found.");
        }
 
        return matchingResource;
    }
 
    internal async Task StartResourceAsync(string resourceName, CancellationToken cancellationToken)
    {
        var matchingResource = GetMatchingResource(resourceName);
 
        switch (matchingResource.DcpResource)
        {
            case Container c:
                await StartExecutableOrContainerAsync(c).ConfigureAwait(false);
                break;
            case Executable e:
                await StartExecutableOrContainerAsync(e).ConfigureAwait(false);
                break;
            case ExecutableReplicaSet rs:
                var replicas = matchingResource.ModelResource.GetReplicaCount();
                var patch = CreatePatch(rs, obj => obj.Spec.Replicas = replicas);
 
                await kubernetesService.PatchAsync(rs, patch, cancellationToken).ConfigureAwait(false);
                break;
            default:
                throw new InvalidOperationException($"Unexpected resource type: {matchingResource.DcpResource.GetType().FullName}");
        }
 
        async Task StartExecutableOrContainerAsync<T>(T resource) where T : CustomResource
        {
            var resourceName = resource.Metadata.Name;
            _logger.LogDebug("Starting {ResouceType} '{ResourceName}'.", typeof(T).Name, resourceName);
 
            var resourceNotFound = false;
            try
            {
                await kubernetesService.DeleteAsync<T>(resourceName, cancellationToken: cancellationToken).ConfigureAwait(false);
            }
            catch (HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
            {
                // No-op if the resource wasn't found.
                // This could happen in a race condition, e.g. double clicking start button.
                resourceNotFound = true;
            }
 
            // Ensure resource is deleted. DeleteAsync returns before the resource is completely deleted so we must poll
            // to discover when it is safe to recreate the resource. This is required because the resources share the same name.
            // Deleting a resource might take a while (more than 10 seconds), because DCP tries to gracefully shut it down first
            // before resorting to more extreme measures.
            if (!resourceNotFound)
            {
                var ensureDeleteRetryStrategy = new RetryStrategyOptions()
                {
                    BackoffType = DelayBackoffType.Exponential,
                    Delay = TimeSpan.FromMilliseconds(200),
                    UseJitter = true,
                    MaxRetryAttempts = 10, // Cumulative time for all attempts amounts to about 15 seconds
                    MaxDelay = TimeSpan.FromSeconds(3),
                    ShouldHandle = new PredicateBuilder().Handle<Exception>(),
                    OnRetry = (retry) =>
                    {
                        _logger.LogDebug("Retrying check for deleted resource '{ResourceName}'. Attempt: {Attempt}", resourceName, retry.AttemptNumber);
                        return ValueTask.CompletedTask;
                    }
                };
 
                var execution = new ResiliencePipelineBuilder().AddRetry(ensureDeleteRetryStrategy).Build();
 
                await execution.ExecuteAsync(async (attemptCancellationToken) =>
                {
                    try
                    {
                        await kubernetesService.GetAsync<T>(resource.Metadata.Name, cancellationToken: attemptCancellationToken).ConfigureAwait(false);
                        throw new DistributedApplicationException($"Failed to delete '{resource.Metadata.Name}' successfully before restart.");
                    }
                    catch (HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
                    {
                        // Success.
                    }
                }, cancellationToken).ConfigureAwait(false);
            }
 
            await kubernetesService.CreateAsync(resource, cancellationToken).ConfigureAwait(false);
        }
    }
}