File: Dcp\DcpExecutor.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Data;
using System.Diagnostics;
using System.Globalization;
using System.Net.Sockets;
using System.Text.Json;
using System.Threading.Channels;
using Aspire.Dashboard.ConsoleLogs;
using Aspire.Dashboard.Model;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.ConsoleLogs;
using Aspire.Hosting.Dashboard;
using Aspire.Hosting.Dcp.Model;
using Aspire.Hosting.Utils;
using Json.Patch;
using k8s;
using k8s.Autorest;
using k8s.Models;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
 
namespace Aspire.Hosting.Dcp;
 
internal sealed class DcpExecutor : IDcpExecutor
{
    private const string DebugSessionPortVar = "DEBUG_SESSION_PORT";
    private const string DefaultAspireNetworkName = "default-aspire-network";
 
    private readonly ILogger<DistributedApplication> _distributedApplicationLogger;
    private readonly IKubernetesService _kubernetesService;
    private readonly IConfiguration _configuration;
    private readonly ResourceLoggerService _loggerService;
    private readonly IDcpDependencyCheckService _dcpDependencyCheckService;
    private readonly DcpNameGenerator _nameGenerator;
    private readonly ILogger<DcpExecutor> _logger;
    private readonly DistributedApplicationModel _model;
    private readonly DistributedApplicationOptions _distributedApplicationOptions;
    private readonly IOptions<DcpOptions> _options;
    private readonly DistributedApplicationExecutionContext _executionContext;
    private readonly List<AppResource> _appResources = [];
    private readonly CancellationTokenSource _shutdownCancellation = new();
    private readonly DcpExecutorEvents _executorEvents;
 
    private readonly DcpResourceState _resourceState;
    private readonly ResourceSnapshotBuilder _snapshotBuilder;
 
    // Internal for testing.
    internal ResiliencePipeline DeleteResourceRetryPipeline { get; set; }
    internal ResiliencePipeline CreateServiceRetryPipeline { get; set; }
    internal ResiliencePipeline WatchResourceRetryPipeline { get; set; }
 
    private readonly ConcurrentDictionary<string, (CancellationTokenSource Cancellation, Task Task)> _logStreams = new();
    private DcpInfo? _dcpInfo;
    private Task? _resourceWatchTask;
 
    private readonly record struct LogInformationEntry(string ResourceName, bool? LogsAvailable, bool? HasSubscribers);
    private readonly Channel<LogInformationEntry> _logInformationChannel = Channel.CreateUnbounded<LogInformationEntry>(
        new UnboundedChannelOptions { SingleReader = true });
 
    public DcpExecutor(ILogger<DcpExecutor> logger,
                       ILogger<DistributedApplication> distributedApplicationLogger,
                       DistributedApplicationModel model,
                       IKubernetesService kubernetesService,
                       IConfiguration configuration,
                       DistributedApplicationOptions distributedApplicationOptions,
                       IOptions<DcpOptions> options,
                       DistributedApplicationExecutionContext executionContext,
                       ResourceLoggerService loggerService,
                       IDcpDependencyCheckService dcpDependencyCheckService,
                       DcpNameGenerator nameGenerator,
                       DcpExecutorEvents executorEvents)
    {
        _distributedApplicationLogger = distributedApplicationLogger;
        _kubernetesService = kubernetesService;
        _configuration = configuration;
        _loggerService = loggerService;
        _dcpDependencyCheckService = dcpDependencyCheckService;
        _nameGenerator = nameGenerator;
        _executorEvents = executorEvents;
        _logger = logger;
        _model = model;
        _distributedApplicationOptions = distributedApplicationOptions;
        _options = options;
        _executionContext = executionContext;
        _resourceState = new(model.Resources.ToDictionary(r => r.Name));
        _snapshotBuilder = new(_resourceState);
 
        DeleteResourceRetryPipeline = DcpPipelineBuilder.BuildDeleteRetryPipeline(logger);
        CreateServiceRetryPipeline = DcpPipelineBuilder.BuildCreateServiceRetryPipeline(options.Value, logger);
        WatchResourceRetryPipeline = DcpPipelineBuilder.BuildWatchResourcePipeline(logger);
    }
 
    private string DefaultContainerHostName => _configuration["AppHost:ContainerHostname"] ?? _dcpInfo?.Containers?.ContainerHostName ?? "host.docker.internal";
 
    public async Task RunApplicationAsync(CancellationToken cancellationToken = default)
    {
        AspireEventSource.Instance.DcpModelCreationStart();
 
        _dcpInfo = await _dcpDependencyCheckService.GetDcpInfoAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
 
        Debug.Assert(_dcpInfo is not null, "DCP info should not be null at this point");
 
        try
        {
            PrepareServices();
            PrepareContainers();
            PrepareExecutables();
 
            await _executorEvents.PublishAsync(new OnResourcesPreparedContext(cancellationToken)).ConfigureAwait(false);
 
            // Watch for changes to the resource state.
            WatchResourceChanges();
 
            await CreateServicesAsync(cancellationToken).ConfigureAwait(false);
 
            await CreateContainerNetworksAsync(cancellationToken).ConfigureAwait(false);
 
            await CreateContainersAndExecutablesAsync(cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            _shutdownCancellation.Cancel();
            throw;
        }
        finally
        {
            AspireEventSource.Instance.DcpModelCreationStop();
        }
    }
 
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        _shutdownCancellation.Cancel();
        var tasks = new List<Task>();
        if (_resourceWatchTask is { } resourceTask)
        {
            tasks.Add(resourceTask);
        }
 
        foreach (var (_, (cancellation, logTask)) in _logStreams)
        {
            cancellation.Cancel();
            tasks.Add(logTask);
        }
 
        try
        {
            await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // Ignore.
        }
        catch (Exception ex)
        {
            _logger.LogDebug(ex, "One or more monitoring tasks terminated with an error.");
        }
 
        try
        {
            // The app orchestrator (represented by kubernetesService here) will perform a resource cleanup
            // (if not done already) when the app host process exits.
            // This is just a perf optimization, so we do not care that much if this call fails.
            // There is not much difference for single app run, but for tests that tend to launch multiple instances
            // of app host from the same process, the gain from programmatic orchestrator shutdown is significant
            // See https://github.com/dotnet/aspire/issues/6561 for more info.
            await _kubernetesService.StopServerAsync(Model.ResourceCleanup.Full, cancellationToken).ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // Ignore.
        }
        catch (Exception ex)
        {
            _logger.LogDebug(ex, "Application orchestrator could not be stopped programmatically.");
        }
    }
 
    private void WatchResourceChanges()
    {
        var outputSemaphore = new SemaphoreSlim(1);
 
        var cancellationToken = _shutdownCancellation.Token;
        var watchResourcesTask = Task.Run(async () =>
        {
            using (outputSemaphore)
            {
                await Task.WhenAll(
                    Task.Run(() => WatchKubernetesResourceAsync<Executable>((t, r) => ProcessResourceChange(t, r, _resourceState.ExecutablesMap, "Executable", (e, s) => _snapshotBuilder.ToSnapshot(e, s)))),
                    Task.Run(() => WatchKubernetesResourceAsync<Container>((t, r) => ProcessResourceChange(t, r, _resourceState.ContainersMap, "Container", (c, s) => _snapshotBuilder.ToSnapshot(c, s)))),
                    Task.Run(() => WatchKubernetesResourceAsync<Service>(ProcessServiceChange)),
                    Task.Run(() => WatchKubernetesResourceAsync<Endpoint>(ProcessEndpointChange))).ConfigureAwait(false);
            }
        });
 
        var watchSubscribersTask = Task.Run(async () =>
        {
            await foreach (var subscribers in _loggerService.WatchAnySubscribersAsync(cancellationToken).ConfigureAwait(false))
            {
                _logInformationChannel.Writer.TryWrite(new(subscribers.Name, LogsAvailable: null, subscribers.AnySubscribers));
            }
        });
 
        // Listen to the "log information channel" - which contains updates when resources have logs available and when they have subscribers.
        // A resource needs both logs available and subscribers before it starts streaming its logs.
        // We only want to start the log stream for resources when they have subscribers.
        // And when there are no more subscribers, we want to stop the stream.
        var watchInformationChannelTask = Task.Run(async () =>
        {
            var resourceLogState = new Dictionary<string, (bool logsAvailable, bool hasSubscribers)>();
 
            await foreach (var entry in _logInformationChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
            {
                var logsAvailable = false;
                var hasSubscribers = false;
                if (resourceLogState.TryGetValue(entry.ResourceName, out (bool, bool) stateEntry))
                {
                    (logsAvailable, hasSubscribers) = stateEntry;
                }
 
                // LogsAvailable can only go from false => true. Once it is true, it can never go back to false.
                Debug.Assert(!entry.LogsAvailable.HasValue || entry.LogsAvailable.Value, "entry.LogsAvailable should never be 'false'");
 
                logsAvailable = entry.LogsAvailable ?? logsAvailable;
                hasSubscribers = entry.HasSubscribers ?? hasSubscribers;
 
                if (logsAvailable)
                {
                    if (hasSubscribers)
                    {
                        if (_resourceState.ContainersMap.TryGetValue(entry.ResourceName, out var container))
                        {
                            StartLogStream(container);
                        }
                        else if (_resourceState.ExecutablesMap.TryGetValue(entry.ResourceName, out var executable))
                        {
                            StartLogStream(executable);
                        }
                    }
                    else
                    {
                        if (_logStreams.TryRemove(entry.ResourceName, out var logStream))
                        {
                            logStream.Cancellation.Cancel();
                        }
                    }
                }
 
                resourceLogState[entry.ResourceName] = (logsAvailable, hasSubscribers);
            }
        });
 
        _resourceWatchTask = Task.WhenAll(watchResourcesTask, watchSubscribersTask, watchInformationChannelTask);
 
        async Task WatchKubernetesResourceAsync<T>(Func<WatchEventType, T, Task> handler) where T : CustomResource
        {
            try
            {
                _logger.LogDebug("Watching over DCP {ResourceType} resources.", typeof(T).Name);
                await WatchResourceRetryPipeline.ExecuteAsync(async (pipelineCancellationToken) =>
                {
                    await foreach (var (eventType, resource) in _kubernetesService.WatchAsync<T>(cancellationToken: pipelineCancellationToken).ConfigureAwait<(global::k8s.WatchEventType, T)>(false))
                    {
                        await outputSemaphore.WaitAsync(pipelineCancellationToken).ConfigureAwait(false);
 
                        try
                        {
                            await handler(eventType, resource).ConfigureAwait(false);
                        }
                        finally
                        {
                            outputSemaphore.Release();
                        }
                    }
                }, cancellationToken).ConfigureAwait(false);
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                // Shutdown requested.
                _logger.LogDebug("Cancellation received while watching {ResourceType} resources.", typeof(T).Name);
            }
            catch (Exception ex)
            {
                _logger.LogCritical(ex, "Watch task over Kubernetes {ResourceType} resources terminated unexpectedly.", typeof(T).Name);
            }
            finally
            {
                _logger.LogDebug("Stopped watching {ResourceType} resources.", typeof(T).Name);
            }
        }
    }
 
    private async Task ProcessResourceChange<T>(WatchEventType watchEventType, T resource, ConcurrentDictionary<string, T> resourceByName, string resourceKind, Func<T, CustomResourceSnapshot, CustomResourceSnapshot> snapshotFactory) where T : CustomResource
    {
        if (ProcessResourceChange(resourceByName, watchEventType, resource))
        {
            UpdateAssociatedServicesMap();
 
            var changeType = watchEventType switch
            {
                WatchEventType.Added or WatchEventType.Modified => ResourceSnapshotChangeType.Upsert,
                WatchEventType.Deleted => ResourceSnapshotChangeType.Delete,
                _ => throw new System.ComponentModel.InvalidEnumArgumentException($"Cannot convert {nameof(WatchEventType)} with value {watchEventType} into enum of type {nameof(ResourceSnapshotChangeType)}.")
            };
 
            // Find the associated application model resource and update it.
            var resourceName = resource.AppModelResourceName;
 
            if (resourceName is not null &&
                _resourceState.ApplicationModel.TryGetValue(resourceName, out var appModelResource))
            {
                if (changeType == ResourceSnapshotChangeType.Delete)
                {
                    // Stop the log stream for the resource
                    if (_logStreams.TryRemove(resource.Metadata.Name, out var logStream))
                    {
                        logStream.Cancellation.Cancel();
                    }
 
                    // TODO: Handle resource deletion
                    if (_logger.IsEnabled(LogLevel.Trace))
                    {
                        _logger.LogTrace("Deleting application model resource {ResourceName} with {ResourceKind} resource {ResourceName}", appModelResource.Name, resourceKind, resource.Metadata.Name);
                    }
                }
                else
                {
                    if (_logger.IsEnabled(LogLevel.Trace))
                    {
                        _logger.LogTrace("Updating application model resource {ResourceName} with {ResourceKind} resource {ResourceName}", appModelResource.Name, resourceKind, resource.Metadata.Name);
                    }
 
                    var resourceType = GetResourceType(resource, appModelResource);
                    var status = GetResourceStatus(resource);
                    await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, resourceType, appModelResource, resource.Metadata.Name, status, s => snapshotFactory(resource, s))).ConfigureAwait(false);
 
                    if (resource is Container { LogsAvailable: true } ||
                        resource is Executable { LogsAvailable: true })
                    {
                        _logInformationChannel.Writer.TryWrite(new(resource.Metadata.Name, LogsAvailable: true, HasSubscribers: null));
                    }
                }
            }
            else
            {
                // No application model resource found for the DCP resource.
                if (_logger.IsEnabled(LogLevel.Trace))
                {
                    _logger.LogTrace("No application model resource found for {ResourceKind} resource {ResourceName}", resourceKind, resource.Metadata.Name);
                }
            }
        }
 
        void UpdateAssociatedServicesMap()
        {
            // We keep track of associated services for the resource
            // So whenever we get the service we can figure out if the service can generate endpoint for the resource
            if (watchEventType == WatchEventType.Deleted)
            {
                _resourceState.ResourceAssociatedServicesMap.Remove((resourceKind, resource.Metadata.Name), out _);
            }
            else if (resource.Metadata.Annotations?.TryGetValue(CustomResource.ServiceProducerAnnotation, out var servicesProducedAnnotationJson) == true)
            {
                var serviceProducerAnnotations = JsonSerializer.Deserialize<ServiceProducerAnnotation[]>(servicesProducedAnnotationJson);
                if (serviceProducerAnnotations is not null)
                {
                    _resourceState.ResourceAssociatedServicesMap[(resourceKind, resource.Metadata.Name)]
                        = serviceProducerAnnotations.Select(e => e.ServiceName).ToList();
                }
            }
        }
    }
 
    private static string GetResourceType<T>(T resource, IResource appModelResource) where T : CustomResource
    {
        return resource switch
        {
            Container => KnownResourceTypes.Container,
            Executable => appModelResource is ProjectResource ? KnownResourceTypes.Project : KnownResourceTypes.Executable,
            _ => throw new InvalidOperationException($"Unknown resource type {resource.GetType().Name}")
        };
    }
 
    private static ResourceStatus GetResourceStatus(CustomResource resource)
    {
        if (resource is Container container)
        {
            return new(container.Status?.State, container.Status?.StartupTimestamp?.ToUniversalTime(), container.Status?.FinishTimestamp?.ToUniversalTime());
        }
        if (resource is Executable executable)
        {
            return new(executable.Status?.State, executable.Status?.StartupTimestamp?.ToUniversalTime(), executable.Status?.FinishTimestamp?.ToUniversalTime());
        }
        return new(null, null, null);
    }
 
    private void StartLogStream<T>(T resource) where T : CustomResource
    {
        IAsyncEnumerable<IReadOnlyList<(string, bool)>>? enumerable = resource switch
        {
            Container c when c.LogsAvailable => new ResourceLogSource<T>(_logger, _kubernetesService, resource),
            Executable e when e.LogsAvailable => new ResourceLogSource<T>(_logger, _kubernetesService, resource),
            _ => null
        };
 
        // No way to get logs for this resource as yet
        if (enumerable is null)
        {
            return;
        }
 
        // This does not run concurrently for the same resource so we can safely use GetOrAdd without
        // creating multiple log streams.
        _logStreams.GetOrAdd(resource.Metadata.Name, (_) =>
        {
            var cancellation = new CancellationTokenSource();
 
            var task = Task.Run(async () =>
            {
                try
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                    {
                        _logger.LogDebug("Starting log streaming for {ResourceName}.", resource.Metadata.Name);
                    }
 
                    // Pump the logs from the enumerable into the logger
                    var logger = _loggerService.GetInternalLogger(resource.Metadata.Name);
 
                    await foreach (var batch in enumerable.WithCancellation(cancellation.Token).ConfigureAwait(false))
                    {
                        foreach (var (content, isError) in batch)
                        {
                            DateTime? timestamp = null;
                            var resolvedContent = content;
 
                            if (TimestampParser.TryParseConsoleTimestamp(resolvedContent, out var result))
                            {
                                resolvedContent = result.Value.ModifiedText;
                                timestamp = result.Value.Timestamp.UtcDateTime;
                            }
 
                            logger(LogEntry.Create(timestamp, resolvedContent, content, isError));
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ignore
                    _logger.LogDebug("Log streaming for {ResourceName} was cancelled.", resource.Metadata.Name);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error streaming logs for {ResourceName}.", resource.Metadata.Name);
                }
            },
            cancellation.Token);
 
            return (cancellation, task);
        });
    }
 
    private async Task ProcessEndpointChange(WatchEventType watchEventType, Endpoint endpoint)
    {
        if (!ProcessResourceChange(_resourceState.EndpointsMap, watchEventType, endpoint))
        {
            return;
        }
 
        if (endpoint.Metadata.OwnerReferences is null)
        {
            return;
        }
 
        foreach (var ownerReference in endpoint.Metadata.OwnerReferences)
        {
            await TryRefreshResource(ownerReference.Kind, ownerReference.Name).ConfigureAwait(false);
        }
    }
 
    private async Task ProcessServiceChange(WatchEventType watchEventType, Service service)
    {
        if (!ProcessResourceChange(_resourceState.ServicesMap, watchEventType, service))
        {
            return;
        }
 
        foreach (var ((resourceKind, resourceName), _) in _resourceState.ResourceAssociatedServicesMap.Where(e => e.Value.Contains(service.Metadata.Name)))
        {
            await TryRefreshResource(resourceKind, resourceName).ConfigureAwait(false);
        }
    }
 
    private async ValueTask TryRefreshResource(string resourceKind, string resourceName)
    {
        CustomResource? cr = resourceKind switch
        {
            "Container" => _resourceState.ContainersMap.TryGetValue(resourceName, out var container) ? container : null,
            "Executable" => _resourceState.ExecutablesMap.TryGetValue(resourceName, out var executable) ? executable : null,
            _ => null
        };
 
        if (cr is not null)
        {
            var appModelResourceName = cr.AppModelResourceName;
 
            if (appModelResourceName is not null &&
                _resourceState.ApplicationModel.TryGetValue(appModelResourceName, out var appModelResource))
            {
                var status = GetResourceStatus(cr);
                await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, resourceKind, appModelResource, resourceName, status, s =>
                {
                    if (cr is Container container)
                    {
                        return _snapshotBuilder.ToSnapshot(container, s);
                    }
                    else if (cr is Executable exe)
                    {
                        return _snapshotBuilder.ToSnapshot(exe, s);
                    }
                    return s;
                })).ConfigureAwait(false);
            }
        }
    }
 
    private static bool ProcessResourceChange<T>(ConcurrentDictionary<string, T> map, WatchEventType watchEventType, T resource)
            where T : CustomResource
    {
        switch (watchEventType)
        {
            case WatchEventType.Added:
                map.TryAdd(resource.Metadata.Name, resource);
                break;
 
            case WatchEventType.Modified:
                map[resource.Metadata.Name] = resource;
                break;
 
            case WatchEventType.Deleted:
                map.Remove(resource.Metadata.Name, out _);
                break;
 
            default:
                return false;
        }
 
        return true;
    }
 
    private async Task CreateServicesAsync(CancellationToken cancellationToken = default)
    {
        try
        {
            AspireEventSource.Instance.DcpServicesCreationStart();
 
            var needAddressAllocated = _appResources.OfType<ServiceAppResource>()
                .Where(sr => !sr.Service.HasCompleteAddress && sr.Service.Spec.AddressAllocationMode != AddressAllocationModes.Proxyless)
                .ToList();
 
            await CreateResourcesAsync<Service>(cancellationToken).ConfigureAwait(false);
 
            if (needAddressAllocated.Count == 0)
            {
                // No need to wait for any updates to Service objects from the orchestrator.
                return;
            }
 
            await CreateServiceRetryPipeline.ExecuteAsync(async (attemptCancellationToken) =>
            {
                var serviceChangeEnumerator = _kubernetesService.WatchAsync<Service>(cancellationToken: attemptCancellationToken);
                await foreach (var (evt, updated) in serviceChangeEnumerator.ConfigureAwait(false))
                {
                    if (evt == WatchEventType.Bookmark)
                    {
                        // Bookmarks do not contain any data.
                        continue;
                    }
 
                    var srvResource = needAddressAllocated.FirstOrDefault(sr => sr.Service.Metadata.Name == updated.Metadata.Name);
                    if (srvResource == null)
                    {
                        // This service most likely already has full address information, so it is not on needAddressAllocated list.
                        continue;
                    }
 
                    if (updated.HasCompleteAddress)
                    {
                        srvResource.Service.ApplyAddressInfoFrom(updated);
                        needAddressAllocated.Remove(srvResource);
                    }
 
                    if (needAddressAllocated.Count == 0)
                    {
                        return; // We are done
                    }
                }
            }, cancellationToken).ConfigureAwait(false);
 
            // If there are still services that need address allocated, try a final direct query in case the watch missed some updates.
            foreach (var sar in needAddressAllocated)
            {
                var dcpSvc = await _kubernetesService.GetAsync<Service>(sar.Service.Metadata.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
                if (dcpSvc.HasCompleteAddress)
                {
                    sar.Service.ApplyAddressInfoFrom(dcpSvc);
                }
                else
                {
                    _distributedApplicationLogger.LogWarning("Unable to allocate a network port for service '{ServiceName}'; service may be unreachable and its clients may not work properly.", sar.Service.Metadata.Name);
                }
            }
 
        }
        finally
        {
            AspireEventSource.Instance.DcpServicesCreationStop();
        }
    }
 
    private async Task CreateContainerNetworksAsync(CancellationToken cancellationToken)
    {
        var toCreate = _appResources.Where(r => r.DcpResource is ContainerNetwork);
        foreach (var containerNetwork in toCreate)
        {
            if (containerNetwork.DcpResource is ContainerNetwork cn)
            {
                await _kubernetesService.CreateAsync(cn, cancellationToken).ConfigureAwait(false);
            }
        }
    }
 
    private async Task CreateContainersAndExecutablesAsync(CancellationToken cancellationToken)
    {
        var toCreate = _appResources.Where(r => r.DcpResource is Container || r.DcpResource is Executable);
        AddAllocatedEndpointInfo(toCreate);
 
        await _executorEvents.PublishAsync(new OnEndpointsAllocatedContext(cancellationToken)).ConfigureAwait(false);
 
        var containersTask = CreateContainersAsync(toCreate.Where(ar => ar.DcpResource is Container), cancellationToken);
        var executablesTask = CreateExecutablesAsync(toCreate.Where(ar => ar.DcpResource is Executable), cancellationToken);
 
        await Task.WhenAll(containersTask, executablesTask).ConfigureAwait(false);
    }
 
    private void AddAllocatedEndpointInfo(IEnumerable<AppResource> resources)
    {
        var containerHost = DefaultContainerHostName;
 
        foreach (var appResource in resources)
        {
            foreach (var sp in appResource.ServicesProduced)
            {
                var svc = (Service)sp.DcpResource;
 
                if (!svc.HasCompleteAddress && sp.EndpointAnnotation.IsProxied)
                {
                    // This should never happen; if it does, we have a bug without a workaround for th the user.
                    throw new InvalidDataException($"Service {svc.Metadata.Name} should have valid address at this point");
                }
 
                if (!sp.EndpointAnnotation.IsProxied && svc.AllocatedPort is null)
                {
                    throw new InvalidOperationException($"Service '{svc.Metadata.Name}' needs to specify a port for endpoint '{sp.EndpointAnnotation.Name}' since it isn't using a proxy.");
                }
 
                sp.EndpointAnnotation.AllocatedEndpoint = new AllocatedEndpoint(
                    sp.EndpointAnnotation,
                    "localhost",
                    (int)svc.AllocatedPort!,
                    containerHostAddress: appResource.ModelResource.IsContainer() ? containerHost : null,
                    targetPortExpression: $$$"""{{- portForServing "{{{svc.Metadata.Name}}}" -}}""");
            }
        }
    }
 
    private void PrepareServices()
    {
        var serviceProducers = _model.Resources
            .Select(r => (ModelResource: r, Endpoints: r.Annotations.OfType<EndpointAnnotation>().ToArray()))
            .Where(sp => sp.Endpoints.Any());
 
        // We need to ensure that Services have unique names (otherwise we cannot really distinguish between
        // services produced by different resources).
        var serviceNames = new HashSet<string>();
 
        foreach (var sp in serviceProducers)
        {
            var endpoints = sp.Endpoints;
 
            foreach (var endpoint in endpoints)
            {
                var serviceName = _nameGenerator.GetServiceName(sp.ModelResource, endpoint, endpoints.Length > 1, serviceNames);
                var svc = Service.Create(serviceName);
 
                if (!sp.ModelResource.SupportsProxy())
                {
                    // If the resource shouldn't be proxied, we need to enforce that on the annotation
                    endpoint.IsProxied = false;
                }
 
                var port = _options.Value.RandomizePorts && endpoint.IsProxied ? null : endpoint.Port;
                svc.Spec.Port = port;
                svc.Spec.Protocol = PortProtocol.FromProtocolType(endpoint.Protocol);
                svc.Spec.Address = endpoint.TargetHost switch
                {
                    "*" or "+" => "0.0.0.0",
                    _ => endpoint.TargetHost
                };
                svc.Spec.AddressAllocationMode = endpoint.IsProxied ? AddressAllocationModes.Localhost : AddressAllocationModes.Proxyless;
 
                // So we can associate the service with the resource that produced it and the endpoint it represents.
                svc.Annotate(CustomResource.ResourceNameAnnotation, sp.ModelResource.Name);
                svc.Annotate(CustomResource.EndpointNameAnnotation, endpoint.Name);
 
                _appResources.Add(new ServiceAppResource(sp.ModelResource, svc, endpoint));
            }
        }
    }
 
    private void PrepareExecutables()
    {
        PrepareProjectExecutables();
        PreparePlainExecutables();
    }
 
    private void PreparePlainExecutables()
    {
        var modelExecutableResources = _model.GetExecutableResources();
 
        foreach (var executable in modelExecutableResources)
        {
            EnsureRequiredAnnotations(executable);
 
            var exeInstance = GetDcpInstance(executable, instanceIndex: 0);
            var exePath = executable.Command;
            var exe = Executable.Create(exeInstance.Name, exePath);
 
            // The working directory is always relative to the app host project directory (if it exists).
            exe.Spec.WorkingDirectory = executable.WorkingDirectory;
            exe.Spec.ExecutionType = ExecutionType.Process;
            exe.Annotate(CustomResource.OtelServiceNameAnnotation, executable.Name);
            exe.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, exeInstance.Suffix);
            exe.Annotate(CustomResource.ResourceNameAnnotation, executable.Name);
            SetInitialResourceState(executable, exe);
 
            var exeAppResource = new AppResource(executable, exe);
            AddServicesProducedInfo(executable, exe, exeAppResource);
            _appResources.Add(exeAppResource);
        }
    }
 
    private void PrepareProjectExecutables()
    {
        var modelProjectResources = _model.GetProjectResources();
 
        foreach (var project in modelProjectResources)
        {
            if (!project.TryGetLastAnnotation<IProjectMetadata>(out var projectMetadata))
            {
                throw new InvalidOperationException("A project resource is missing required metadata"); // Should never happen.
            }
 
            EnsureRequiredAnnotations(project);
 
            var replicas = project.GetReplicaCount();
 
            for (var i = 0; i < replicas; i++)
            {
                var exeInstance = GetDcpInstance(project, instanceIndex: i);
                var exeSpec = Executable.Create(exeInstance.Name, "dotnet");
                exeSpec.Spec.WorkingDirectory = Path.GetDirectoryName(projectMetadata.ProjectPath);
 
                exeSpec.Annotate(CustomResource.OtelServiceNameAnnotation, project.Name);
                exeSpec.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, exeInstance.Suffix);
                exeSpec.Annotate(CustomResource.ResourceNameAnnotation, project.Name);
                exeSpec.Annotate(CustomResource.ResourceReplicaCount, replicas.ToString(CultureInfo.InvariantCulture));
                exeSpec.Annotate(CustomResource.ResourceReplicaIndex, i.ToString(CultureInfo.InvariantCulture));
 
                SetInitialResourceState(project, exeSpec);
 
                var projectLaunchConfiguration = new ProjectLaunchConfiguration();
                projectLaunchConfiguration.ProjectPath = projectMetadata.ProjectPath;
 
                if (!string.IsNullOrEmpty(_configuration[DebugSessionPortVar]))
                {
                    exeSpec.Spec.ExecutionType = ExecutionType.IDE;
 
                    projectLaunchConfiguration.DisableLaunchProfile = project.TryGetLastAnnotation<ExcludeLaunchProfileAnnotation>(out _);
                    if (!projectLaunchConfiguration.DisableLaunchProfile && project.TryGetLastAnnotation<LaunchProfileAnnotation>(out var lpa))
                    {
                        projectLaunchConfiguration.LaunchProfile = lpa.LaunchProfileName;
                    }
                }
                else
                {
                    exeSpec.Spec.ExecutionType = ExecutionType.Process;
                    if (_configuration.GetBool("DOTNET_WATCH") is not true)
                    {
                        exeSpec.Spec.Args = [
                            "run",
                            "--no-build",
                            "--project",
                            projectMetadata.ProjectPath,
                        ];
                    }
                    else
                    {
                        exeSpec.Spec.Args = [
                            "watch",
                            "--non-interactive",
                            "--no-hot-reload",
                            "--project",
                            projectMetadata.ProjectPath
                        ];
                    }
 
                    if (!string.IsNullOrEmpty(_distributedApplicationOptions.Configuration))
                    {
                        exeSpec.Spec.Args.AddRange(new[] { "-c", _distributedApplicationOptions.Configuration });
                    }
 
                    // We pretty much always want to suppress the normal launch profile handling
                    // because the settings from the profile will override the ambient environment settings, which is not what we want
                    // (the ambient environment settings for service processes come from the application model
                    // and should be HIGHER priority than the launch profile settings).
                    // This means we need to apply the launch profile settings manually--the invocation parameters here,
                    // and the environment variables/application URLs inside CreateExecutableAsync().
                    exeSpec.Spec.Args.Add("--no-launch-profile");
 
                    var launchProfile = project.GetEffectiveLaunchProfile()?.LaunchProfile;
                    if (launchProfile is not null && !string.IsNullOrWhiteSpace(launchProfile.CommandLineArgs))
                    {
                        var cmdArgs = CommandLineArgsParser.Parse(launchProfile.CommandLineArgs);
                        if (cmdArgs.Count > 0)
                        {
                            exeSpec.Spec.Args.Add("--");
                            exeSpec.Spec.Args.AddRange(cmdArgs);
                        }
                    }
                }
 
                // We want this annotation even if we are not using IDE execution; see ToSnapshot() for details.
                exeSpec.AnnotateAsObjectList(Executable.LaunchConfigurationsAnnotation, projectLaunchConfiguration);
 
                var exeAppResource = new AppResource(project, exeSpec);
                AddServicesProducedInfo(project, exeSpec, exeAppResource);
                _appResources.Add(exeAppResource);
            }
        }
    }
 
    private void EnsureRequiredAnnotations(IResource resource)
    {
        // Add the default lifecycle commands (start/stop/restart)
        resource.AddLifeCycleCommands();
 
        _nameGenerator.EnsureDcpInstancesPopulated(resource);
    }
 
    private static void SetInitialResourceState(IResource resource, IAnnotationHolder annotationHolder)
    {
        // Store the initial state of the resource
        if (resource.TryGetLastAnnotation<ResourceSnapshotAnnotation>(out var initial) &&
            initial.InitialSnapshot.State?.Text is string state && !string.IsNullOrEmpty(state))
        {
            annotationHolder.Annotate(CustomResource.ResourceStateAnnotation, state);
        }
    }
 
    private Task CreateExecutablesAsync(IEnumerable<AppResource> executableResources, CancellationToken cancellationToken)
    {
        try
        {
            AspireEventSource.Instance.DcpExecutablesCreateStart();
 
            async Task CreateResourceExecutablesAsyncCore(IResource resource, IEnumerable<AppResource> executables, CancellationToken cancellationToken)
            {
                var resourceLogger = _loggerService.GetLogger(resource);
                var resourceType = resource is ProjectResource ? KnownResourceTypes.Project : KnownResourceTypes.Executable;
 
                try
                {
                    await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, resourceType, resource, DcpResourceName: null)).ConfigureAwait(false);
 
                    foreach (var er in executables)
                    {
                        try
                        {
                            await CreateExecutableAsync(er, resourceLogger, cancellationToken).ConfigureAwait(false);
                        }
                        catch (FailedToApplyEnvironmentException)
                        {
                            // For this exception we don't want the noise of the stack trace, we've already
                            // provided more detail where we detected the issue (e.g. envvar name). To get
                            // more diagnostic information reduce logging level for DCP log category to Debug.
                            await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, er.ModelResource, er.DcpResource.Metadata.Name)).ConfigureAwait(false);
                        }
                        catch (Exception ex)
                        {
                            // The purpose of this catch block is to ensure that if an individual executable resource fails
                            // to start that it doesn't tear down the entire app host AND that we route the error to the
                            // appropriate replica.
                            resourceLogger.LogError(ex, "Failed to create resource {ResourceName}", er.ModelResource.Name);
                            await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, er.ModelResource, er.DcpResource.Metadata.Name)).ConfigureAwait(false);
                        }
                    }
                }
                catch (Exception ex)
                {
                    // The purpose of this catch block is to ensure that if an error processing the overall
                    // configuration of the executable resource files. This is different to the exception handling
                    // block above because at this stage of processing we don't necessarily have any replicas
                    // yet. For example if a dependency fails to start.
                    resourceLogger.LogError(ex, "Failed to create resource {ResourceName}", resource.Name);
                    await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, resource, DcpResourceName: null)).ConfigureAwait(false);
                }
            }
 
            var tasks = new List<Task>();
            foreach (var group in executableResources.GroupBy(e => e.ModelResource))
            {
                tasks.Add(CreateResourceExecutablesAsyncCore(group.Key, group, cancellationToken));
            }
 
            return Task.WhenAll(tasks);
        }
        finally
        {
            AspireEventSource.Instance.DcpExecutablesCreateStop();
        }
    }
 
    private async Task CreateExecutableAsync(AppResource er, ILogger resourceLogger, CancellationToken cancellationToken)
    {
        ExecutableSpec spec;
        Func<Task<CustomResource>> createResource;
 
        switch (er.DcpResource)
        {
            case Executable exe:
                spec = exe.Spec;
                createResource = async () => await _kubernetesService.CreateAsync(exe, cancellationToken).ConfigureAwait(false);
                break;
            default:
                throw new InvalidOperationException($"Expected an Executable resource, but got {er.DcpResource.Kind} instead");
        }
 
        var failedToApplyArgs = false;
        spec.Args ??= [];
 
        if (er.ModelResource.TryGetAnnotationsOfType<CommandLineArgsCallbackAnnotation>(out var exeArgsCallbacks))
        {
            var args = new List<object>();
            var commandLineContext = new CommandLineArgsCallbackContext(args, cancellationToken);
 
            foreach (var exeArgsCallback in exeArgsCallbacks)
            {
                await exeArgsCallback.Callback(commandLineContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                try
                {
                    var value = arg switch
                    {
                        string s => s,
                        IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: false, cancellationToken).ConfigureAwait(false),
                        null => null,
                        _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                    };
 
                    if (value is not null)
                    {
                        spec.Args.Add(value);
                    }
                }
                catch (Exception ex)
                {
                    resourceLogger.LogCritical(ex, "Failed to apply argument '{ConfigKey}'. A dependency may have failed to start.", arg);
                    _logger.LogDebug(ex, "Failed to apply argument '{ConfigKey}' to '{ResourceName}'. A dependency may have failed to start.", arg, er.ModelResource.Name);
                    failedToApplyArgs = true;
                }
            }
        }
 
        var config = new Dictionary<string, object>();
        var context = new EnvironmentCallbackContext(_executionContext, config, cancellationToken)
        {
            Logger = resourceLogger
        };
 
        if (er.ModelResource.TryGetEnvironmentVariables(out var envVarAnnotations))
        {
            foreach (var ann in envVarAnnotations)
            {
                await ann.Callback(context).ConfigureAwait(false);
            }
        }
 
        var failedToApplyConfiguration = false;
        spec.Env = [];
        foreach (var c in config)
        {
            try
            {
                var value = c.Value switch
                {
                    string s => s,
                    IValueProvider valueProvider => await GetValue(c.Key, valueProvider, resourceLogger, isContainer: false, cancellationToken).ConfigureAwait(false),
                    null => null,
                    _ => throw new InvalidOperationException($"Unexpected value for environment variable \"{c.Key}\".")
                };
 
                if (value is not null)
                {
                    spec.Env.Add(new EnvVar { Name = c.Key, Value = value });
                }
            }
            catch (Exception ex)
            {
                resourceLogger.LogCritical(ex, "Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", c.Key);
                _logger.LogDebug(ex, "Failed to apply configuration value '{ConfigKey}' to '{ResourceName}'. A dependency may have failed to start.", c.Key, er.ModelResource.Name);
                failedToApplyConfiguration = true;
            }
        }
 
        if (failedToApplyConfiguration || failedToApplyArgs)
        {
            throw new FailedToApplyEnvironmentException();
        }
 
        await createResource().ConfigureAwait(false);
    }
 
    private async Task<string?> GetValue(string? key, IValueProvider valueProvider, ILogger logger, bool isContainer, CancellationToken cancellationToken)
    {
        var task = ExpressionResolver.ResolveAsync(isContainer, valueProvider, DefaultContainerHostName, cancellationToken);
 
        if (!task.IsCompleted)
        {
            if (valueProvider is IResource resource)
            {
                if (key is null)
                {
                    logger.LogInformation("Waiting for value from resource '{ResourceName}'", resource.Name);
                }
                else
                {
                    logger.LogInformation("Waiting for value for environment variable value '{Name}' from resource '{ResourceName}'", key, resource.Name);
                }
            }
            else if (valueProvider is ConnectionStringReference { Resource: var cs })
            {
                logger.LogInformation("Waiting for value for connection string from resource '{ResourceName}'", cs.Name);
            }
            else
            {
                if (key is null)
                {
                    logger.LogInformation("Waiting for value from {ValueProvider}.", valueProvider.ToString());
                }
                else
                {
                    logger.LogInformation("Waiting for value for environment variable value '{Name}' from {ValueProvider}.", key, valueProvider.ToString());
                }
            }
        }
 
        return await task.ConfigureAwait(false);
    }
 
    private void PrepareContainers()
    {
        var modelContainerResources = _model.GetContainerResources();
 
        foreach (var container in modelContainerResources)
        {
            if (!container.TryGetContainerImageName(out var containerImageName))
            {
                // This should never happen! In order to get into this loop we need
                // to have the annotation, if we don't have the annotation by the time
                // we get here someone is doing something wrong.
                throw new InvalidOperationException();
            }
 
            EnsureRequiredAnnotations(container);
 
            var containerObjectInstance = GetDcpInstance(container, instanceIndex: 0);
            var ctr = Container.Create(containerObjectInstance.Name, containerImageName);
 
            ctr.Spec.ContainerName = containerObjectInstance.Name; // Use the same name for container orchestrator (Docker, Podman) resource and DCP object name.
 
            if (container.GetContainerLifetimeType() == ContainerLifetime.Persistent)
            {
                ctr.Spec.Persistent = true;
            }
 
            ctr.Annotate(CustomResource.ResourceNameAnnotation, container.Name);
            ctr.Annotate(CustomResource.OtelServiceNameAnnotation, container.Name);
            ctr.Annotate(CustomResource.OtelServiceInstanceIdAnnotation, containerObjectInstance.Suffix);
            SetInitialResourceState(container, ctr);
 
            if (container.TryGetContainerMounts(out var containerMounts))
            {
                ctr.Spec.VolumeMounts = [];
 
                foreach (var mount in containerMounts)
                {
                    var volumeSpec = new VolumeMount
                    {
                        Source = mount.Source,
                        Target = mount.Target,
                        Type = mount.Type == ContainerMountType.BindMount ? VolumeMountType.Bind : VolumeMountType.Volume,
                        IsReadOnly = mount.IsReadOnly
                    };
 
                    ctr.Spec.VolumeMounts.Add(volumeSpec);
                }
            }
 
            ctr.Spec.Networks = new List<ContainerNetworkConnection>
            {
                new ContainerNetworkConnection
                {
                    Name = DefaultAspireNetworkName,
                    Aliases = new List<string> { container.Name },
                }
            };
 
            var containerAppResource = new AppResource(container, ctr);
            AddServicesProducedInfo(container, ctr, containerAppResource);
            _appResources.Add(containerAppResource);
        }
    }
 
    /// <summary>
    /// Gets information about the resource's DCP instance. ReplicaInstancesAnnotation is added in BeforeStartEvent.
    /// </summary>
    private static DcpInstance GetDcpInstance(IResource resource, int instanceIndex)
    {
        if (!resource.TryGetLastAnnotation<DcpInstancesAnnotation>(out var replicaAnnotation))
        {
            throw new DistributedApplicationException($"Couldn't find required {nameof(DcpInstancesAnnotation)} annotation on resource {resource.Name}.");
        }
 
        foreach (var instance in replicaAnnotation.Instances)
        {
            if (instance.Index == instanceIndex)
            {
                return instance;
            }
        }
 
        throw new DistributedApplicationException($"Couldn't find required instance ID for index {instanceIndex} on resource {resource.Name}.");
    }
 
    private Task CreateContainersAsync(IEnumerable<AppResource> containerResources, CancellationToken cancellationToken)
    {
        try
        {
            AspireEventSource.Instance.DcpContainersCreateStart();
 
            async Task CreateContainerAsyncCore(AppResource cr, CancellationToken cancellationToken)
            {
                var logger = _loggerService.GetLogger(cr.ModelResource);
 
                try
                {
                    await CreateContainerAsync(cr, logger, cancellationToken).ConfigureAwait(false);
                }
                catch (FailedToApplyEnvironmentException)
                {
                    // For this exception we don't want the noise of the stack trace, we've already
                    // provided more detail where we detected the issue (e.g. envvar name). To get
                    // more diagnostic information reduce logging level for DCP log category to Debug.
                    await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, KnownResourceTypes.Container, cr.ModelResource, cr.DcpResource.Metadata.Name)).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "Failed to create container resource {ResourceName}", cr.ModelResource.Name);
                    await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, KnownResourceTypes.Container, cr.ModelResource, cr.DcpResource.Metadata.Name)).ConfigureAwait(false);
                }
            }
 
            var tasks = new List<Task>();
 
            // Create a custom container network for Aspire if there are container resources
            if (containerResources.Any())
            {
                // The network will be created with a unique postfix to avoid conflicts with other Aspire AppHost networks
                tasks.Add(_kubernetesService.CreateAsync(ContainerNetwork.Create(DefaultAspireNetworkName), cancellationToken));
            }
 
            foreach (var cr in containerResources)
            {
                tasks.Add(CreateContainerAsyncCore(cr, cancellationToken));
            }
 
            return Task.WhenAll(tasks);
        }
        finally
        {
            AspireEventSource.Instance.DcpContainersCreateStop();
        }
    }
 
    private async Task CreateContainerAsync(AppResource cr, ILogger resourceLogger, CancellationToken cancellationToken)
    {
        await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, KnownResourceTypes.Container, cr.ModelResource, cr.DcpResource.Metadata.Name)).ConfigureAwait(false);
 
        var dcpContainerResource = (Container)cr.DcpResource;
        var modelContainerResource = cr.ModelResource;
 
        await ApplyBuildArgumentsAsync(dcpContainerResource, modelContainerResource, cancellationToken).ConfigureAwait(false);
 
        var config = new Dictionary<string, object>();
 
        dcpContainerResource.Spec.Env = [];
 
        if (cr.ServicesProduced.Count > 0)
        {
            dcpContainerResource.Spec.Ports = new();
 
            foreach (var sp in cr.ServicesProduced)
            {
                var ea = sp.EndpointAnnotation;
 
                var portSpec = new ContainerPortSpec()
                {
                    ContainerPort = ea.TargetPort,
                };
 
                if (!ea.IsProxied && ea.Port is int)
                {
                    portSpec.HostPort = ea.Port;
                }
 
                switch (sp.EndpointAnnotation.Protocol)
                {
                    case ProtocolType.Tcp:
                        portSpec.Protocol = PortProtocol.TCP; break;
                    case ProtocolType.Udp:
                        portSpec.Protocol = PortProtocol.UDP; break;
                }
 
                dcpContainerResource.Spec.Ports.Add(portSpec);
            }
        }
 
        if (modelContainerResource.TryGetEnvironmentVariables(out var containerEnvironmentVariables))
        {
            var context = new EnvironmentCallbackContext(_executionContext, config, cancellationToken);
 
            foreach (var v in containerEnvironmentVariables)
            {
                await v.Callback(context).ConfigureAwait(false);
            }
        }
 
        var failedToApplyConfiguration = false;
        foreach (var kvp in config)
        {
            try
            {
                var value = kvp.Value switch
                {
                    string s => s,
                    IValueProvider valueProvider => await GetValue(kvp.Key, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                    null => null,
                    _ => throw new InvalidOperationException($"Unexpected value for environment variable \"{kvp.Key}\".")
                };
 
                if (value is not null)
                {
                    dcpContainerResource.Spec.Env.Add(new EnvVar { Name = kvp.Key, Value = value });
                }
            }
            catch (Exception ex)
            {
                resourceLogger.LogCritical(ex, "Failed to apply configuration value '{ConfigKey}'. A dependency may have failed to start.", kvp.Key);
                _logger.LogDebug(ex, "Failed to apply configuration value '{ConfigKey}' to '{ResourceName}'. A dependency may have failed to start.", kvp.Key, modelContainerResource.Name);
                failedToApplyConfiguration = true;
            }
        }
 
        // Apply optional extra arguments to the container run command.
        if (modelContainerResource.TryGetAnnotationsOfType<ContainerRuntimeArgsCallbackAnnotation>(out var runArgsCallback))
        {
            dcpContainerResource.Spec.RunArgs ??= [];
 
            var args = new List<object>();
 
            var containerRunArgsContext = new ContainerRuntimeArgsCallbackContext(args, cancellationToken);
 
            foreach (var callback in runArgsCallback)
            {
                await callback.Callback(containerRunArgsContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                try
                {
                    var value = arg switch
                    {
                        string s => s,
                        IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                        null => null,
                        _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                    };
 
                    if (value is not null)
                    {
                        dcpContainerResource.Spec.RunArgs.Add(value);
                    }
                }
                catch (Exception ex)
                {
                    resourceLogger.LogCritical(ex, "Failed to apply container runtime argument '{ConfigKey}'. A dependency may have failed to start.", arg);
                    _logger.LogDebug(ex, "Failed to apply container runtime argument '{ConfigKey}' to '{ResourceName}'. A dependency may have failed to start.", arg, modelContainerResource.Name);
                    failedToApplyConfiguration = true;
                }
            }
        }
 
        var failedToApplyArgs = false;
        if (modelContainerResource.TryGetAnnotationsOfType<CommandLineArgsCallbackAnnotation>(out var argsCallback))
        {
            dcpContainerResource.Spec.Args ??= [];
 
            var args = new List<object>();
 
            var commandLineArgsContext = new CommandLineArgsCallbackContext(args, cancellationToken);
 
            foreach (var callback in argsCallback)
            {
                await callback.Callback(commandLineArgsContext).ConfigureAwait(false);
            }
 
            foreach (var arg in args)
            {
                try
                {
                    var value = arg switch
                    {
                        string s => s,
                        IValueProvider valueProvider => await GetValue(key: null, valueProvider, resourceLogger, isContainer: true, cancellationToken).ConfigureAwait(false),
                        null => null,
                        _ => throw new InvalidOperationException($"Unexpected value for {arg}")
                    };
 
                    if (value is not null)
                    {
                        dcpContainerResource.Spec.Args.Add(value);
                    }
                }
                catch (Exception ex)
                {
                    resourceLogger.LogCritical(ex, "Failed to apply container argument '{ConfigKey}'. A dependency may have failed to start.", arg);
                    _logger.LogDebug(ex, "Failed to apply container argument '{ConfigKey}' to '{ResourceName}'. A dependency may have failed to start.", arg, modelContainerResource.Name);
                    failedToApplyArgs = true;
                }
            }
        }
 
        if (modelContainerResource is ContainerResource containerResource)
        {
            dcpContainerResource.Spec.Command = containerResource.Entrypoint;
        }
 
        if (failedToApplyArgs || failedToApplyConfiguration)
        {
            throw new FailedToApplyEnvironmentException();
        }
 
        if (_dcpInfo is not null)
        {
            DcpDependencyCheck.CheckDcpInfoAndLogErrors(resourceLogger, _options.Value, _dcpInfo);
        }
 
        await _kubernetesService.CreateAsync(dcpContainerResource, cancellationToken).ConfigureAwait(false);
    }
 
    private static async Task ApplyBuildArgumentsAsync(Container dcpContainerResource, IResource modelContainerResource, CancellationToken cancellationToken)
    {
        if (modelContainerResource.Annotations.OfType<DockerfileBuildAnnotation>().SingleOrDefault() is { } dockerfileBuildAnnotation)
        {
            var dcpBuildArgs = new List<EnvVar>();
 
            foreach (var buildArgument in dockerfileBuildAnnotation.BuildArguments)
            {
                var valueString = buildArgument.Value switch
                {
                    string stringValue => stringValue,
                    IValueProvider valueProvider => await valueProvider.GetValueAsync(cancellationToken).ConfigureAwait(false),
                    bool boolValue => boolValue ? "true" : "false",
                    null => null,
                    _ => buildArgument.Value.ToString()
                };
 
                var dcpBuildArg = new EnvVar()
                {
                    Name = buildArgument.Key,
                    Value = valueString
                };
 
                dcpBuildArgs.Add(dcpBuildArg);
            }
 
            dcpContainerResource.Spec.Build = new()
            {
                Context = dockerfileBuildAnnotation.ContextPath,
                Dockerfile = dockerfileBuildAnnotation.DockerfilePath,
                Stage = dockerfileBuildAnnotation.Stage,
                Args = dcpBuildArgs
            };
 
            var dcpBuildSecrets = new List<BuildContextSecret>();
 
            foreach (var buildSecret in dockerfileBuildAnnotation.BuildSecrets)
            {
                var valueString = buildSecret.Value switch
                {
                    FileInfo filePath => filePath.FullName,
                    IValueProvider valueProvider => await valueProvider.GetValueAsync(cancellationToken).ConfigureAwait(false),
                    _ => throw new InvalidOperationException("Build secret can only be a parameter or a file.")
                };
 
                if (buildSecret.Value is FileInfo)
                {
                    var dcpBuildSecret = new BuildContextSecret
                    {
                        Id = buildSecret.Key,
                        Type = "file",
                        Source = valueString
                    };
                    dcpBuildSecrets.Add(dcpBuildSecret);
                }
                else
                {
                    var dcpBuildSecret = new BuildContextSecret
                    {
                        Id = buildSecret.Key,
                        Type = "env",
                        Value = valueString
                    };
                    dcpBuildSecrets.Add(dcpBuildSecret);
                }
            }
 
            dcpContainerResource.Spec.Build = new()
            {
                Context = dockerfileBuildAnnotation.ContextPath,
                Dockerfile = dockerfileBuildAnnotation.DockerfilePath,
                Stage = dockerfileBuildAnnotation.Stage,
                Args = dcpBuildArgs,
                Secrets = dcpBuildSecrets
            };
        }
    }
 
    private void AddServicesProducedInfo(IResource modelResource, IAnnotationHolder dcpResource, AppResource appResource)
    {
        var modelResourceName = "(unknown)";
        try
        {
            modelResourceName = DcpNameGenerator.GetObjectNameForResource(modelResource, _options.Value);
        }
        catch { } // For error messages only, OK to fall back to (unknown)
 
        var servicesProduced = _appResources.OfType<ServiceAppResource>().Where(r => r.ModelResource == modelResource);
        foreach (var sp in servicesProduced)
        {
            var ea = sp.EndpointAnnotation;
 
            if (modelResource.IsContainer())
            {
                if (ea.TargetPort is null)
                {
                    throw new InvalidOperationException($"The endpoint '{ea.Name}' for container resource '{modelResourceName}' must specify the {nameof(EndpointAnnotation.TargetPort)} value");
                }
            }
            else if (!ea.IsProxied)
            {
                if (HasMultipleReplicas(appResource.DcpResource))
                {
                    throw new InvalidOperationException($"Resource '{modelResourceName}' uses multiple replicas and a proxy-less endpoint '{ea.Name}'. These features do not work together.");
                }
 
                if (ea.Port is int && ea.Port != ea.TargetPort)
                {
                    throw new InvalidOperationException($"The endpoint '{ea.Name}' for resource '{modelResourceName}' is not using a proxy, and it has a value of {nameof(EndpointAnnotation.Port)} property that is different from the value of {nameof(EndpointAnnotation.TargetPort)} property. For proxy-less endpoints they must match.");
                }
            }
            else
            {
                Debug.Assert(ea.IsProxied);
 
                if (ea.TargetPort is int && ea.Port is int && ea.TargetPort == ea.Port)
                {
                    throw new InvalidOperationException(
                        $"The endpoint '{ea.Name}' for resource '{modelResourceName}' requested a proxy ({nameof(ea.IsProxied)} is true). Non-container resources cannot be proxied when both {nameof(ea.TargetPort)} and {nameof(ea.Port)} are specified with the same value.");
                }
 
                if (HasMultipleReplicas(appResource.DcpResource) && ea.TargetPort is int)
                {
                    throw new InvalidOperationException(
                        $"Resource '{modelResourceName}' can have multiple replicas, and it uses endpoint '{ea.Name}' that has {nameof(ea.TargetPort)} property set. Each replica must have a unique port; setting {nameof(ea.TargetPort)} is not allowed.");
                }
            }
 
            var spAnn = new ServiceProducerAnnotation(sp.Service.Metadata.Name);
            spAnn.Port = ea.TargetPort;
            dcpResource.AnnotateAsObjectList(CustomResource.ServiceProducerAnnotation, spAnn);
            appResource.ServicesProduced.Add(sp);
        }
 
        static bool HasMultipleReplicas(CustomResource resource)
        {
            if (resource is Executable exe && exe.Metadata.Annotations.TryGetValue(CustomResource.ResourceReplicaCount, out var value) && int.TryParse(value, CultureInfo.InvariantCulture, out var replicas) && replicas > 1)
            {
                return true;
            }
            return false;
        }
    }
 
    private async Task CreateResourcesAsync<RT>(CancellationToken cancellationToken) where RT : CustomResource
    {
        try
        {
            var resourcesToCreate = _appResources.Select(r => r.DcpResource).OfType<RT>();
            if (!resourcesToCreate.Any())
            {
                return;
            }
 
            // CONSIDER batched creation
            foreach (var res in resourcesToCreate)
            {
                await _kubernetesService.CreateAsync(res, cancellationToken).ConfigureAwait(false);
            }
        }
        catch (OperationCanceledException ex)
        {
            // We catch and suppress the OperationCancelledException because the user may CTRL-C
            // during start up of the resources.
            _logger.LogDebug(ex, "Cancellation during creation of resources.");
        }
    }
 
    /// <summary>
    /// Create a patch update using the specified resource.
    /// A copy is taken of the resource to avoid permanently changing it.
    /// </summary>
    private static V1Patch CreatePatch<T>(T obj, Action<T> change) where T : CustomResource
    {
        // This method isn't very efficient.
        // If mass or frequent patches are required then we may want to create patches manually.
        var current = JsonSerializer.SerializeToNode(obj);
 
        var copy = JsonSerializer.Deserialize<T>(current)!;
        change(copy);
 
        var changed = JsonSerializer.SerializeToNode(copy);
 
        var jsonPatch = current.CreatePatch(changed);
        return new V1Patch(jsonPatch, V1Patch.PatchType.JsonPatch);
    }
 
    public async Task StopResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken)
    {
        var appResource = (AppResource)resourceReference;
 
        V1Patch patch;
        switch (appResource.DcpResource)
        {
            case Container c:
                patch = CreatePatch(c, obj => obj.Spec.Stop = true);
                await _kubernetesService.PatchAsync(c, patch, cancellationToken).ConfigureAwait(false);
                break;
            case Executable e:
                patch = CreatePatch(e, obj => obj.Spec.Stop = true);
                await _kubernetesService.PatchAsync(e, patch, cancellationToken).ConfigureAwait(false);
                break;
            default:
                throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
        }
    }
 
    public IResourceReference GetResource(string resourceName)
    {
        var matchingResource = _appResources
            .Where(r => r.DcpResource is not Service)
            .SingleOrDefault(r => string.Equals(r.DcpResource.Metadata.Name, resourceName, StringComparisons.ResourceName));
        if (matchingResource == null)
        {
            throw new InvalidOperationException($"Resource '{resourceName}' not found.");
        }
 
        return matchingResource;
    }
 
    public async Task StartResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken)
    {
        var appResource = (AppResource)resourceReference;
        var resourceType = GetResourceType(appResource.DcpResource, appResource.ModelResource);
 
        try
        {
            switch (appResource.DcpResource)
            {
                case Container c:
                    await StartExecutableOrContainerAsync(c).ConfigureAwait(false);
                    break;
                case Executable e:
                    await StartExecutableOrContainerAsync(e).ConfigureAwait(false);
                    break;
                default:
                    throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to start resource {ResourceName}", appResource.ModelResource.Name);
            await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, appResource.ModelResource, appResource.DcpResource.Metadata.Name)).ConfigureAwait(false);
            throw;
        }
 
        async Task StartExecutableOrContainerAsync<T>(T resource) where T : CustomResource
        {
            var resourceName = resource.Metadata.Name;
            _logger.LogDebug("Starting {ResourceType} '{ResourceName}'.", typeof(T).Name, resourceName);
 
            var resourceNotFound = false;
            try
            {
                await _kubernetesService.DeleteAsync<T>(resourceName, cancellationToken: cancellationToken).ConfigureAwait(false);
            }
            catch (HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
            {
                // No-op if the resource wasn't found.
                // This could happen in a race condition, e.g. double clicking start button.
                resourceNotFound = true;
            }
 
            // Ensure resource is deleted. DeleteAsync returns before the resource is completely deleted so we must poll
            // to discover when it is safe to recreate the resource. This is required because the resources share the same name.
            // Deleting a resource might take a while (more than 10 seconds), because DCP tries to gracefully shut it down first
            // before resorting to more extreme measures.
            if (!resourceNotFound)
            {
                await DeleteResourceRetryPipeline.ExecuteAsync(async (state, attemptCancellationToken) =>
                {
                    try
                    {
                        await _kubernetesService.GetAsync<T>(state, cancellationToken: attemptCancellationToken).ConfigureAwait(false);
                        throw new DistributedApplicationException($"Failed to delete '{state}' successfully before restart.");
                    }
                    catch (HttpOperationException ex) when (ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
                    {
                        // Success.
                    }
                }, resource.Metadata.Name, cancellationToken).ConfigureAwait(false);
            }
 
            // Raise event after resource has been deleted. This is required because the event sets the status to "Starting" and resources being
            // deleted will temporarily override the status to a terminal state, such as "Exited".
            await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, resourceType, appResource.ModelResource, appResource.DcpResource.Metadata.Name)).ConfigureAwait(false);
 
            await _kubernetesService.CreateAsync(resource, cancellationToken).ConfigureAwait(false);
        }
    }
}