File: Publishing\PublishingActivityProgressReporter.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
#pragma warning disable ASPIREPUBLISHERS001
#pragma warning disable ASPIREINTERACTION001
 
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Threading.Channels;
using Aspire.Hosting.Backchannel;
using Aspire.Hosting.ApplicationModel;
 
namespace Aspire.Hosting.Publishing;
 
/// <summary>
/// Represents the completion state of a publishing activity (task, step, or top-level operation).
/// </summary>
[Experimental("ASPIREPUBLISHERS001", UrlFormat = "https://aka.ms/aspire/diagnostics/{0}")]
public enum CompletionState
{
    /// <summary>
    /// The task is in progress.
    /// </summary>
    InProgress,
 
    /// <summary>
    /// The task completed successfully.
    /// </summary>
    Completed,
 
    /// <summary>
    /// The task completed with warnings.
    /// </summary>
    CompletedWithWarning,
 
    /// <summary>
    /// The task completed with an error.
    /// </summary>
    CompletedWithError
}
 
/// <summary>
/// Represents a publishing step, which can contain multiple tasks.
/// </summary>
[Experimental("ASPIREPUBLISHERS001", UrlFormat = "https://aka.ms/aspire/diagnostics/{0}")]
public sealed class PublishingStep : IAsyncDisposable
{
    private readonly ConcurrentDictionary<string, PublishingTask> _tasks = new();
 
    internal PublishingStep(string id, string title)
    {
        Id = id;
        Title = title;
    }
 
    /// <summary>
    /// Unique Id of the step.
    /// </summary>
    public string Id { get; private set; }
 
    /// <summary>
    /// The title of the publishing step.
    /// </summary>
    public string Title { get; private set; }
 
    /// <summary>
    /// The completion state of the step. Defaults to InProgress.
    /// The state is only aggregated from child tasks during disposal.
    /// </summary>
    public CompletionState CompletionState
    {
        get => _completionState;
        internal set => _completionState = value;
    }
 
    private CompletionState _completionState = CompletionState.InProgress;
 
    /// <summary>
    /// The completion text for the step.
    /// </summary>
    public string CompletionText { get; internal set; } = string.Empty;
 
    /// <summary>
    /// The collection of child tasks belonging to this step.
    /// </summary>
    public IReadOnlyDictionary<string, PublishingTask> Tasks => _tasks;
 
    /// <summary>
    /// The progress reporter that created this step.
    /// </summary>
    internal IPublishingActivityProgressReporter? Reporter { get; set; }
 
    /// <summary>
    /// Adds a task to this step.
    /// </summary>
    internal void AddTask(PublishingTask task)
    {
        _tasks.TryAdd(task.Id, task);
    }
 
    /// <summary>
    /// Recalculates the completion state based on child tasks.
    /// </summary>
    internal CompletionState CalculateAggregatedState()
    {
        if (_tasks.IsEmpty)
        {
            return CompletionState.Completed;
        }
 
        var maxState = CompletionState.InProgress;
        foreach (var task in _tasks.Values)
        {
            if ((int)task.CompletionState > (int)maxState)
            {
                maxState = task.CompletionState;
            }
        }
        return maxState;
    }
 
    /// <summary>
    /// Creates a new task within this step.
    /// </summary>
    /// <param name="statusText">The initial status text for the task.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns>The created task.</returns>
    public async Task<PublishingTask> CreateTaskAsync(string statusText, CancellationToken cancellationToken = default)
    {
        if (Reporter is null)
        {
            throw new InvalidOperationException("Cannot create task: Reporter is not set.");
        }
 
        return await Reporter.CreateTaskAsync(this, statusText, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Disposes the step and aggregates the final completion state from all child tasks.
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        if (Reporter is null)
        {
            return;
        }
 
        // Only complete the step if it's still in progress to avoid double completion
        if (CompletionState != CompletionState.InProgress)
        {
            return;
        }
 
        // Use the current completion state or calculate it from child tasks if still in progress
        var finalState = CalculateAggregatedState();
 
        // Only set completion text if it has not been explicitly set
        var completionText = string.IsNullOrEmpty(CompletionText)
            ? finalState switch
            {
                CompletionState.Completed => $"{Title} completed successfully",
                CompletionState.CompletedWithWarning => $"{Title} completed with warnings",
                CompletionState.CompletedWithError => $"{Title} completed with errors",
                _ => $"{Title} completed"
            }
            : CompletionText;
 
        await Reporter.CompleteStepAsync(this, completionText, finalState, CancellationToken.None).ConfigureAwait(false);
    }
}
 
/// <summary>
/// Represents a publishing task, which belongs to a step.
/// </summary>
[Experimental("ASPIREPUBLISHERS001", UrlFormat = "https://aka.ms/aspire/diagnostics/{0}")]
public sealed class PublishingTask : IAsyncDisposable
{
    internal PublishingTask(string id, string stepId, string statusText, PublishingStep parentStep)
    {
        Id = id;
        StepId = stepId;
        StatusText = statusText;
        ParentStep = parentStep;
    }
    /// <summary>
    /// Unique Id of the task.
    /// </summary>
    public string Id { get; private set; }
 
    /// <summary>
    /// The identifier of the step this task belongs to.
    /// </summary>
    public string StepId { get; private set; }
 
    /// <summary>
    /// Reference to the parent step this task belongs to.
    /// </summary>
    public PublishingStep ParentStep { get; internal set; }
 
    /// <summary>
    /// The current status text of the task.
    /// </summary>
    public string StatusText { get; internal set; }
 
    /// <summary>
    /// The completion state of the task.
    /// </summary>
    public CompletionState CompletionState { get; internal set; } = CompletionState.InProgress;
 
    /// <summary>
    /// Optional completion message for the task.
    /// </summary>
    public string CompletionMessage { get; internal set; } = string.Empty;
 
    /// <summary>
    /// The progress reporter that created this task.
    /// </summary>
    internal IPublishingActivityProgressReporter? Reporter { get; set; }
 
    /// <summary>
    /// Updates the status text of this task.
    /// </summary>
    /// <param name="statusText">The new status text.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public async Task UpdateAsync(string statusText, CancellationToken cancellationToken = default)
    {
        if (Reporter is null)
        {
            throw new InvalidOperationException("Cannot update task: Reporter is not set.");
        }
 
        await Reporter.UpdateTaskAsync(this, statusText, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Marks the task as completed successfully.
    /// </summary>
    /// <param name="completionMessage">Optional completion message.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public async Task CompleteAsync(string? completionMessage = null, CancellationToken cancellationToken = default)
    {
        if (Reporter is null)
        {
            throw new InvalidOperationException("Cannot complete task: Reporter is not set.");
        }
 
        await Reporter.CompleteTaskAsync(this, CompletionState.Completed, completionMessage, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Marks the task as completed with warnings.
    /// </summary>
    /// <param name="completionMessage">Optional completion message.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public async Task CompleteWithWarningAsync(string? completionMessage = null, CancellationToken cancellationToken = default)
    {
        if (Reporter is null)
        {
            throw new InvalidOperationException("Cannot complete task: Reporter is not set.");
        }
 
        await Reporter.CompleteTaskAsync(this, CompletionState.CompletedWithWarning, completionMessage, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Marks the task as failed with an error.
    /// </summary>
    /// <param name="completionMessage">Optional completion message.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public async Task FailAsync(string? completionMessage = null, CancellationToken cancellationToken = default)
    {
        if (Reporter is null)
        {
            throw new InvalidOperationException("Cannot fail task: Reporter is not set.");
        }
 
        await Reporter.CompleteTaskAsync(this, CompletionState.CompletedWithError, completionMessage, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Disposes the task, completing it successfully if not already completed.
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        if (Reporter is null || CompletionState != CompletionState.InProgress)
        {
            return;
        }
 
        // Auto-complete with success if not already completed
        await CompleteAsync(cancellationToken: CancellationToken.None).ConfigureAwait(false);
    }
}
 
/// <summary>
/// Interface for reporting publishing activity progress.
/// </summary>
[Experimental("ASPIREPUBLISHERS001", UrlFormat = "https://aka.ms/aspire/diagnostics/{0}")]
public interface IPublishingActivityProgressReporter
{
    /// <summary>
    /// Creates a new publishing step with the specified ID and title.
    /// </summary>
    /// <param name="title">The title of the publishing step.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns>The publishing step</returns>
    /// <exception cref="InvalidOperationException">Thrown when a step with the same ID already exists.</exception>
    Task<PublishingStep> CreateStepAsync(string title, CancellationToken cancellationToken);
 
    /// <summary>
    /// Creates a new publishing task tied to a step.
    /// </summary>
    /// <param name="step">The step this task belongs to.</param>
    /// <param name="statusText">The initial status text.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns>The publishing task</returns>
    /// <exception cref="InvalidOperationException">Thrown when the step does not exist or is already complete.</exception>
    Task<PublishingTask> CreateTaskAsync(PublishingStep step, string statusText, CancellationToken cancellationToken);
 
    /// <summary>
    /// Completes a publishing step with the specified completion text and state.
    /// </summary>
    /// <param name="step">The step to complete.</param>
    /// <param name="completionText">The completion text for the step.</param>
    /// <param name="completionState">The completion state for the step.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    Task CompleteStepAsync(PublishingStep step, string completionText, CompletionState completionState, CancellationToken cancellationToken = default);
 
    /// <summary>
    /// Updates the status text of an existing publishing task.
    /// </summary>
    /// <param name="task">The task to update.</param>
    /// <param name="statusText">The new status text.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <exception cref="InvalidOperationException">Thrown when the parent step is already complete.</exception>
    Task UpdateTaskAsync(PublishingTask task, string statusText, CancellationToken cancellationToken);
 
    /// <summary>
    /// Completes a publishing task with the specified completion state and optional completion message.
    /// </summary>
    /// <param name="task">The task to complete.</param>
    /// <param name="completionState">The completion state.</param>
    /// <param name="completionMessage">Optional completion message that will appear as a dimmed child message.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <exception cref="InvalidOperationException">Thrown when the parent step is already complete.</exception>
    Task CompleteTaskAsync(PublishingTask task, CompletionState completionState, string? completionMessage = null, CancellationToken cancellationToken = default);
 
    /// <summary>
    /// Signals that the entire publishing process has completed.
    /// </summary>
    /// <param name="completionState">The completion state of the publishing process. When null, the state is automatically aggregated from all steps.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    Task CompletePublishAsync(CompletionState? completionState = null, CancellationToken cancellationToken = default);
}
 
internal sealed class PublishingActivityProgressReporter : IPublishingActivityProgressReporter, IAsyncDisposable
{
    private readonly ConcurrentDictionary<string, PublishingStep> _steps = new();
    private readonly InteractionService _interactionService;
    private readonly CancellationTokenSource _cancellationTokenSource = new();
    private readonly Task _interactionServiceSubscriber;
 
    public PublishingActivityProgressReporter(InteractionService interactionService)
    {
        _interactionService = interactionService;
        _interactionServiceSubscriber = Task.Run(() => SubscribeToInteractionsAsync(_cancellationTokenSource.Token));
    }
 
    private static string ToBackchannelCompletionState(CompletionState state) => state switch
    {
        CompletionState.InProgress => CompletionStates.InProgress,
        CompletionState.Completed => CompletionStates.Completed,
        CompletionState.CompletedWithWarning => CompletionStates.CompletedWithWarning,
        CompletionState.CompletedWithError => CompletionStates.CompletedWithError,
        _ => CompletionStates.InProgress
    };
 
    public async Task<PublishingStep> CreateStepAsync(string title, CancellationToken cancellationToken)
    {
        var step = new PublishingStep(Guid.NewGuid().ToString(), title)
        {
            Reporter = this
        };
        _steps.TryAdd(step.Id, step);
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.Step,
            Data = new PublishingActivityData
            {
                Id = step.Id,
                StatusText = step.Title,
                CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
                StepId = null
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
        return step;
    }
 
    public async Task<PublishingTask> CreateTaskAsync(PublishingStep step, string statusText, CancellationToken cancellationToken)
    {
        if (!_steps.TryGetValue(step.Id, out var parentStep))
        {
            throw new InvalidOperationException($"Step with ID '{step.Id}' does not exist.");
        }
 
        lock (parentStep)
        {
            if (parentStep.CompletionState != CompletionState.InProgress)
            {
                throw new InvalidOperationException($"Cannot create task for step '{step.Id}' because the step is already complete.");
            }
        }
 
        var task = new PublishingTask(Guid.NewGuid().ToString(), step.Id, statusText, parentStep)
        {
            Reporter = this
        };
 
        // Add task to parent step
        parentStep.AddTask(task);
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.Task,
            Data = new PublishingActivityData
            {
                Id = task.Id,
                StatusText = statusText,
                CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
                StepId = step.Id
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
        return task;
    }
 
    public async Task CompleteStepAsync(PublishingStep step, string completionText, CompletionState completionState, CancellationToken cancellationToken = default)
    {
        lock (step)
        {
            step.CompletionState = completionState;
            step.CompletionText = completionText;
        }
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.Step,
            Data = new PublishingActivityData
            {
                Id = step.Id,
                StatusText = completionText,
                CompletionState = ToBackchannelCompletionState(completionState),
                StepId = null
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
    }
 
    public async Task UpdateTaskAsync(PublishingTask task, string statusText, CancellationToken cancellationToken)
    {
        if (!_steps.TryGetValue(task.StepId, out var parentStep))
        {
            throw new InvalidOperationException($"Parent step with ID '{task.StepId}' does not exist.");
        }
 
        lock (parentStep)
        {
            if (parentStep.CompletionState != CompletionState.InProgress)
            {
                throw new InvalidOperationException($"Cannot update task '{task.Id}' because its parent step '{task.StepId}' is already complete.");
            }
 
            task.StatusText = statusText;
        }
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.Task,
            Data = new PublishingActivityData
            {
                Id = task.Id,
                StatusText = statusText,
                CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
                StepId = task.StepId
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
    }
 
    public async Task CompleteTaskAsync(PublishingTask task, CompletionState completionState, string? completionMessage = null, CancellationToken cancellationToken = default)
    {
        if (!_steps.TryGetValue(task.StepId, out var parentStep))
        {
            throw new InvalidOperationException($"Parent step with ID '{task.StepId}' does not exist.");
        }
 
        if (task.CompletionState != CompletionState.InProgress)
        {
            throw new InvalidOperationException($"Cannot complete task '{task.Id}' with state '{task.CompletionState}'. Only 'InProgress' tasks can be completed.");
        }
 
        lock (parentStep)
        {
            if (parentStep.CompletionState != CompletionState.InProgress)
            {
                throw new InvalidOperationException($"Cannot complete task '{task.Id}' because its parent step '{task.StepId}' is already complete.");
            }
 
            task.CompletionState = completionState;
            task.CompletionMessage = completionMessage ?? string.Empty;
        }
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.Task,
            Data = new PublishingActivityData
            {
                Id = task.Id,
                StatusText = task.StatusText,
                CompletionState = ToBackchannelCompletionState(completionState),
                StepId = task.StepId,
                CompletionMessage = completionMessage
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
    }
 
    public async Task CompletePublishAsync(CompletionState? completionState = null, CancellationToken cancellationToken = default)
    {
        // Use provided state or aggregate from all steps
        var finalState = completionState ?? CalculateOverallAggregatedState();
 
        var state = new PublishingActivity
        {
            Type = PublishingActivityTypes.PublishComplete,
            Data = new PublishingActivityData
            {
                Id = PublishingActivityTypes.PublishComplete,
                StatusText = finalState switch
                {
                    CompletionState.Completed => "Publishing completed successfully",
                    CompletionState.CompletedWithWarning => "Publishing completed with warnings",
                    CompletionState.CompletedWithError => "Publishing completed with errors",
                    _ => "Publishing completed"
                },
                CompletionState = ToBackchannelCompletionState(finalState)
            }
        };
 
        await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Calculates the overall completion state by aggregating all steps.
    /// </summary>
    private CompletionState CalculateOverallAggregatedState()
    {
        if (_steps.IsEmpty)
        {
            return CompletionState.Completed;
        }
 
        var maxState = CompletionState.InProgress;
        foreach (var step in _steps.Values)
        {
            var stepState = step.CompletionState;
            if ((int)stepState > (int)maxState)
            {
                maxState = stepState;
            }
        }
        return maxState;
    }
 
    private async Task SubscribeToInteractionsAsync(CancellationToken cancellationToken)
    {
        try
        {
            await foreach (var interaction in _interactionService.SubscribeInteractionUpdates(cancellationToken).ConfigureAwait(false))
            {
                await HandleInteractionUpdateAsync(interaction, cancellationToken).ConfigureAwait(false);
            }
        }
        catch (OperationCanceledException)
        {
            // Expected when cancellation is requested
        }
    }
 
    /// <summary>
    /// Checks if there are any steps currently in progress.
    /// </summary>
    private bool HasStepsInProgress()
    {
        return _steps.Any(step => step.Value.CompletionState == CompletionState.InProgress);
    }
 
    private async Task HandleInteractionUpdateAsync(Interaction interaction, CancellationToken cancellationToken)
    {
        // Only handle input interaction types
        if (interaction.InteractionInfo is not Interaction.InputsInteractionInfo inputsInfo || inputsInfo.Inputs.Count == 0)
        {
            return;
        }
 
        if (interaction.State == Interaction.InteractionState.InProgress)
        {
            if (HasStepsInProgress())
            {
                await _interactionService.CompleteInteractionAsync(interaction.InteractionId, (interaction, ServiceProvider, cancellationToken) =>
                {
                    // Complete the interaction with an error state
                    interaction.CompletionTcs.TrySetException(new InvalidOperationException("Cannot prompt interaction while steps are in progress."));
                    return Task.FromResult(new InteractionCompletionState
                    {
                        Complete = false,
                        State = "Cannot prompt interaction while steps are in progress."
                    });
                }, cancellationToken).ConfigureAwait(false);
                return;
            }
 
            var promptInputs = inputsInfo.Inputs.Select(input => new PublishingPromptInput
            {
                Label = input.Label,
                InputType = input.InputType.ToString(),
                Required = input.Required,
                Options = input.Options,
                Value = input.Value
            }).ToList();
 
            var activity = new PublishingActivity
            {
                Type = PublishingActivityTypes.Prompt,
                Data = new PublishingActivityData
                {
                    Id = interaction.InteractionId.ToString(CultureInfo.InvariantCulture),
                    StatusText = interaction.Message ?? $"{interaction.Title}: ",
                    CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
                    Inputs = promptInputs
                }
            };
 
            await ActivityItemUpdated.Writer.WriteAsync(activity, cancellationToken).ConfigureAwait(false);
        }
    }
 
    internal async Task CompleteInteractionAsync(string promptId, string?[]? responses, CancellationToken cancellationToken = default)
    {
        if (int.TryParse(promptId, CultureInfo.InvariantCulture, out var interactionId))
        {
            await _interactionService.CompleteInteractionAsync(interactionId,
                (interaction, serviceProvider, cancellationToken) =>
                {
                    if (interaction.InteractionInfo is Interaction.InputsInteractionInfo inputsInfo)
                    {
                        // Set values for all inputs if we have responses
                        if (responses is not null)
                        {
                            for (var i = 0; i < Math.Min(inputsInfo.Inputs.Count, responses.Length); i++)
                            {
                                inputsInfo.Inputs[i].SetValue(responses[i] ?? "");
                            }
                        }
 
                        return Task.FromResult(new InteractionCompletionState
                        {
                            Complete = true,
                            State = inputsInfo.Inputs
                        });
                    }
 
                    return Task.FromResult(new InteractionCompletionState
                    {
                        Complete = true,
                        State = null
                    });
                },
                cancellationToken).ConfigureAwait(false);
        }
    }
 
    public async ValueTask DisposeAsync()
    {
        if (!_cancellationTokenSource.IsCancellationRequested)
        {
            _cancellationTokenSource.Cancel();
        }
 
        try
        {
            await _interactionServiceSubscriber.ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // Expected when cancellation is requested
        }
 
        _cancellationTokenSource.Dispose();
    }
 
    internal Channel<PublishingActivity> ActivityItemUpdated { get; } = Channel.CreateUnbounded<PublishingActivity>();
}