File: Pipelines\DistributedApplicationPipeline.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
#pragma warning disable ASPIREPUBLISHERS001
#pragma warning disable ASPIREPIPELINES001
 
using System.Diagnostics;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Text;
using Aspire.Hosting.ApplicationModel;
 
namespace Aspire.Hosting.Pipelines;
 
[DebuggerDisplay("{ToString(),nq}")]
internal sealed class DistributedApplicationPipeline : IDistributedApplicationPipeline
{
    private readonly List<PipelineStep> _steps = [];
 
    public bool HasSteps => _steps.Count > 0;
 
    public void AddStep(string name,
        Func<DeployingContext, Task> action,
        object? dependsOn = null,
        object? requiredBy = null)
    {
        if (_steps.Any(s => s.Name == name))
        {
            throw new InvalidOperationException(
                $"A step with the name '{name}' has already been added to the pipeline.");
        }
 
        var step = new PipelineStep
        {
            Name = name,
            Action = action
        };
 
        if (dependsOn != null)
        {
            AddDependencies(step, dependsOn);
        }
 
        if (requiredBy != null)
        {
            AddRequiredBy(step, requiredBy);
        }
 
        _steps.Add(step);
    }
 
    private static void AddDependencies(PipelineStep step, object dependsOn)
    {
        if (dependsOn is string stepName)
        {
            step.DependsOn(stepName);
        }
        else if (dependsOn is IEnumerable<string> stepNames)
        {
            foreach (var name in stepNames)
            {
                step.DependsOn(name);
            }
        }
        else
        {
            throw new ArgumentException(
                $"The dependsOn parameter must be a string or IEnumerable<string>, but was {dependsOn.GetType().Name}.",
                nameof(dependsOn));
        }
    }
 
    private static void AddRequiredBy(PipelineStep step, object requiredBy)
    {
        if (requiredBy is string stepName)
        {
            step.RequiredBy(stepName);
        }
        else if (requiredBy is IEnumerable<string> stepNames)
        {
            foreach (var name in stepNames)
            {
                step.RequiredBy(name);
            }
        }
        else
        {
            throw new ArgumentException(
                $"The requiredBy parameter must be a string or IEnumerable<string>, but was {requiredBy.GetType().Name}.",
                nameof(requiredBy));
        }
    }
 
    public void AddStep(PipelineStep step)
    {
        if (_steps.Any(s => s.Name == step.Name))
        {
            throw new InvalidOperationException(
                $"A step with the name '{step.Name}' has already been added to the pipeline.");
        }
 
        _steps.Add(step);
    }
 
    public async Task ExecuteAsync(DeployingContext context)
    {
        var allSteps = _steps.Concat(CollectStepsFromAnnotations(context)).ToList();
 
        if (allSteps.Count == 0)
        {
            return;
        }
 
        ValidateSteps(allSteps);
 
        var stepsByName = allSteps.ToDictionary(s => s.Name);
 
        var levels = ResolveDependencies(allSteps, stepsByName);
 
        foreach (var level in levels)
        {
            var tasks = level.Select(step => ExecuteStepAsync(step, context)).ToList();
            try
            {
                await Task.WhenAll(tasks).ConfigureAwait(false);
            }
            catch
            {
                // Collect all exceptions from failed tasks
                var exceptions = tasks
                    .Where(t => t.IsFaulted)
                    .SelectMany(t => t.Exception?.InnerExceptions ?? Enumerable.Empty<Exception>())
                    .ToList();
 
                if (exceptions.Count == 1)
                {
                    ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
                }
                else if (exceptions.Count > 1)
                {
                    throw new AggregateException(
                        $"Multiple pipeline steps failed at the same level: {string.Join(", ", exceptions.OfType<InvalidOperationException>().Select(e => e.Message))}",
                        exceptions);
                }
 
                throw;
            }
        }
    }
 
    private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingContext context)
    {
        foreach (var resource in context.Model.Resources)
        {
            var annotations = resource.Annotations
                .OfType<PipelineStepAnnotation>();
 
            foreach (var annotation in annotations)
            {
                foreach (var step in annotation.CreateSteps())
                {
                    yield return step;
                }
            }
        }
    }
 
    private static void ValidateSteps(IEnumerable<PipelineStep> steps)
    {
        var stepNames = new HashSet<string>();
 
        foreach (var step in steps)
        {
            if (!stepNames.Add(step.Name))
            {
                throw new InvalidOperationException(
                    $"Duplicate step name: '{step.Name}'");
            }
        }
 
        foreach (var step in steps)
        {
            foreach (var dependency in step.DependsOnSteps)
            {
                if (!stepNames.Contains(dependency))
                {
                    throw new InvalidOperationException(
                        $"Step '{step.Name}' depends on unknown step '{dependency}'");
                }
            }
 
            foreach (var requiredBy in step.RequiredBySteps)
            {
                if (!stepNames.Contains(requiredBy))
                {
                    throw new InvalidOperationException(
                        $"Step '{step.Name}' is required by unknown step '{requiredBy}'");
                }
            }
        }
    }
 
    /// <summary>
    /// Resolves the dependencies among the steps and organizes them into levels for execution.
    /// </summary>
    /// <param name="steps">The complete set of pipeline steps populated from annotations and the builder</param>
    /// <param name="stepsByName">A dictionary mapping step names to their corresponding step objects</param>
    /// <returns>A list of lists where each list contains the steps to be executed at the same level</returns>
    private static List<List<PipelineStep>> ResolveDependencies(
        IEnumerable<PipelineStep> steps,
        Dictionary<string, PipelineStep> stepsByName)
    {
        // Initial a graph that represents a step and its dependencies
        // and an inDegree map to count the number of dependencies that
        // each step has.
        var graph = new Dictionary<string, List<string>>();
        var inDegree = new Dictionary<string, int>();
 
        foreach (var step in steps)
        {
            graph[step.Name] = [];
            inDegree[step.Name] = 0;
        }
 
        // Process all the `RequiredBy` relationships in the graph and adds
        // the each `RequiredBy` step to the DependsOn list of the step that requires it.
        foreach (var step in steps)
        {
            foreach (var requiredByStep in step.RequiredBySteps)
            {
                if (!graph.ContainsKey(requiredByStep))
                {
                    throw new InvalidOperationException(
                        $"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
                }
 
                if (stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj) &&
                    !requiredByStepObj.DependsOnSteps.Contains(step.Name))
                {
                    requiredByStepObj.DependsOnSteps.Add(step.Name);
                }
            }
        }
 
        // Now that the `DependsOn` lists are fully populated, we can build the graph
        // and the inDegree map based only on the DependOnSteps list.
        foreach (var step in steps)
        {
            foreach (var dependency in step.DependsOnSteps)
            {
                if (!graph.TryGetValue(dependency, out var dependents))
                {
                    throw new InvalidOperationException(
                        $"Step '{step.Name}' depends on unknown step '{dependency}'");
                }
 
                dependents.Add(step.Name);
                inDegree[step.Name]++;
            }
        }
 
        // Perform a topological sort to determine the levels of execution and
        // initialize a queue with all steps that have no dependencies (inDegree of 0)
        // and can be executed immediately as part of the first level.
        var levels = new List<List<PipelineStep>>();
        var queue = new Queue<string>(
            inDegree.Where(kvp => kvp.Value == 0).Select(kvp => kvp.Key)
        );
 
        // Process the queue until all steps have been organized into levels.
        // We start with the steps that have no dependencies and then iterate
        // through all the steps that depend on them to build out the graph
        // until no more steps are available to process.
        while (queue.Count > 0)
        {
            var currentLevel = new List<PipelineStep>();
            var levelSize = queue.Count;
 
            for (var i = 0; i < levelSize; i++)
            {
                var stepName = queue.Dequeue();
                var step = stepsByName[stepName];
                currentLevel.Add(step);
 
                // For each dependent step, reduce its inDegree by 1
                // in each iteration since its dependencies have been
                // processed. Once a dependent step has an inDegree
                // of 0, it means all its dependencies have been
                // processed and it can be added to the queue so we
                // can process the next level of dependencies.
                foreach (var dependent in graph[stepName])
                {
                    inDegree[dependent]--;
                    if (inDegree[dependent] == 0)
                    {
                        queue.Enqueue(dependent);
                    }
                }
            }
 
            // Exhausting the queue means that we've resolved all
            // steps that can run in parallel.
            levels.Add(currentLevel);
        }
 
        // If the total number of steps in all levels does not equal
        // the total number of steps in the pipeline, it indicates that
        // there is a circular dependency in the graph. Steps are enqueued
        // for processing into levels above when all their dependencies are
        // resolved. When a cycle exists, the degrees of the steps in the cycle
        // will never reach zero and won't be enqueued for processing so the
        // total number of processed steps will be less than the total number
        // of steps in the pipeline.
        if (levels.Sum(l => l.Count) != steps.Count())
        {
            var processedSteps = new HashSet<string>(levels.SelectMany(l => l.Select(s => s.Name)));
            var stepsInCycle = steps.Where(s => !processedSteps.Contains(s.Name)).Select(s => s.Name).ToList();
 
            throw new InvalidOperationException(
                $"Circular dependency detected in pipeline steps: {string.Join(", ", stepsInCycle)}");
        }
 
        return levels;
    }
 
    private static async Task ExecuteStepAsync(PipelineStep step, DeployingContext context)
    {
        try
        {
            await step.Action(context).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            var exceptionInfo = ExceptionDispatchInfo.Capture(ex);
            throw new InvalidOperationException(
                $"Step '{step.Name}' failed: {ex.Message}", exceptionInfo.SourceException);
        }
    }
 
    public override string ToString()
    {
        if (_steps.Count == 0)
        {
            return "Pipeline: (empty)";
        }
 
        var sb = new StringBuilder();
        sb.AppendLine(CultureInfo.InvariantCulture, $"Pipeline with {_steps.Count} step(s):");
 
        foreach (var step in _steps)
        {
            sb.Append(CultureInfo.InvariantCulture, $"  - {step.Name}");
 
            if (step.DependsOnSteps.Count > 0)
            {
                sb.Append(CultureInfo.InvariantCulture, $" [depends on: {string.Join(", ", step.DependsOnSteps)}]");
            }
 
            if (step.RequiredBySteps.Count > 0)
            {
                sb.Append(CultureInfo.InvariantCulture, $" [required by: {string.Join(", ", step.RequiredBySteps)}]");
            }
 
            sb.AppendLine();
        }
 
        return sb.ToString();
    }
}