|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
#pragma warning disable ASPIREPUBLISHERS001
#pragma warning disable ASPIREINTERACTION001
using System.Collections.Concurrent;
using System.Globalization;
using System.Threading.Channels;
using Aspire.Hosting.Backchannel;
using Aspire.Hosting.ApplicationModel;
namespace Aspire.Hosting.Publishing;
internal sealed class PublishingActivityReporter : IPublishingActivityReporter, IAsyncDisposable
{
private readonly ConcurrentDictionary<string, PublishingStep> _steps = new();
private readonly InteractionService _interactionService;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Task _interactionServiceSubscriber;
public PublishingActivityReporter(InteractionService interactionService)
{
_interactionService = interactionService;
_interactionServiceSubscriber = Task.Run(() => SubscribeToInteractionsAsync(_cancellationTokenSource.Token));
}
private static string ToBackchannelCompletionState(CompletionState state) => state switch
{
CompletionState.InProgress => CompletionStates.InProgress,
CompletionState.Completed => CompletionStates.Completed,
CompletionState.CompletedWithWarning => CompletionStates.CompletedWithWarning,
CompletionState.CompletedWithError => CompletionStates.CompletedWithError,
_ => CompletionStates.InProgress
};
public async Task<IPublishingStep> CreateStepAsync(string title, CancellationToken cancellationToken = default)
{
var step = new PublishingStep(this, Guid.NewGuid().ToString(), title);
_steps.TryAdd(step.Id, step);
var state = new PublishingActivity
{
Type = PublishingActivityTypes.Step,
Data = new PublishingActivityData
{
Id = step.Id,
StatusText = step.Title,
CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
StepId = null
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
return step;
}
public async Task<PublishingTask> CreateTaskAsync(PublishingStep step, string statusText, CancellationToken cancellationToken)
{
if (!_steps.TryGetValue(step.Id, out var parentStep))
{
throw new InvalidOperationException($"Step with ID '{step.Id}' does not exist.");
}
lock (parentStep)
{
if (parentStep.CompletionState != CompletionState.InProgress)
{
throw new InvalidOperationException($"Cannot create task for step '{step.Id}' because the step is already complete.");
}
}
var task = new PublishingTask(Guid.NewGuid().ToString(), step.Id, statusText, parentStep);
// Add task to parent step
parentStep.AddTask(task);
var state = new PublishingActivity
{
Type = PublishingActivityTypes.Task,
Data = new PublishingActivityData
{
Id = task.Id,
StatusText = statusText,
CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
StepId = step.Id
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
return task;
}
public async Task CompleteStepAsync(PublishingStep step, string completionText, CompletionState completionState, CancellationToken cancellationToken)
{
lock (step)
{
// Prevent double completion if the step is already complete
if (step.CompletionState != CompletionState.InProgress)
{
throw new InvalidOperationException($"Cannot complete step '{step.Id}' with state '{step.CompletionState}'. Only 'InProgress' steps can be completed.");
}
step.CompletionState = completionState;
step.CompletionText = completionText;
}
var state = new PublishingActivity
{
Type = PublishingActivityTypes.Step,
Data = new PublishingActivityData
{
Id = step.Id,
StatusText = completionText,
CompletionState = ToBackchannelCompletionState(completionState),
StepId = null
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
}
public async Task UpdateTaskAsync(PublishingTask task, string statusText, CancellationToken cancellationToken)
{
if (!_steps.TryGetValue(task.StepId, out var parentStep))
{
throw new InvalidOperationException($"Parent step with ID '{task.StepId}' does not exist.");
}
lock (parentStep)
{
if (parentStep.CompletionState != CompletionState.InProgress)
{
throw new InvalidOperationException($"Cannot update task '{task.Id}' because its parent step '{task.StepId}' is already complete.");
}
task.StatusText = statusText;
}
var state = new PublishingActivity
{
Type = PublishingActivityTypes.Task,
Data = new PublishingActivityData
{
Id = task.Id,
StatusText = statusText,
CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
StepId = task.StepId
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
}
public async Task CompleteTaskAsync(PublishingTask task, CompletionState completionState, string? completionMessage, CancellationToken cancellationToken)
{
if (!_steps.TryGetValue(task.StepId, out var parentStep))
{
throw new InvalidOperationException($"Parent step with ID '{task.StepId}' does not exist.");
}
if (task.CompletionState != CompletionState.InProgress)
{
throw new InvalidOperationException($"Cannot complete task '{task.Id}' with state '{task.CompletionState}'. Only 'InProgress' tasks can be completed.");
}
lock (parentStep)
{
if (parentStep.CompletionState != CompletionState.InProgress)
{
throw new InvalidOperationException($"Cannot complete task '{task.Id}' because its parent step '{task.StepId}' is already complete.");
}
task.CompletionState = completionState;
task.CompletionMessage = completionMessage ?? string.Empty;
}
var state = new PublishingActivity
{
Type = PublishingActivityTypes.Task,
Data = new PublishingActivityData
{
Id = task.Id,
StatusText = task.StatusText,
CompletionState = ToBackchannelCompletionState(completionState),
StepId = task.StepId,
CompletionMessage = completionMessage
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
}
public async Task CompletePublishAsync(string? completionMessage = null, CompletionState? completionState = null, CancellationToken cancellationToken = default)
{
// Use provided state or aggregate from all steps
var finalState = completionState ?? CalculateOverallAggregatedState();
var state = new PublishingActivity
{
Type = PublishingActivityTypes.PublishComplete,
Data = new PublishingActivityData
{
Id = PublishingActivityTypes.PublishComplete,
StatusText = completionMessage ?? finalState switch
{
CompletionState.Completed => "Publishing completed successfully",
CompletionState.CompletedWithWarning => "Publishing completed with warnings",
CompletionState.CompletedWithError => "Publishing completed with errors",
_ => "Publishing completed"
},
CompletionState = ToBackchannelCompletionState(finalState)
}
};
await ActivityItemUpdated.Writer.WriteAsync(state, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Calculates the overall completion state by aggregating all steps.
/// </summary>
private CompletionState CalculateOverallAggregatedState()
{
if (_steps.IsEmpty)
{
return CompletionState.Completed;
}
var maxState = CompletionState.InProgress;
foreach (var step in _steps.Values)
{
var stepState = step.CompletionState;
if ((int)stepState > (int)maxState)
{
maxState = stepState;
}
}
return maxState;
}
private async Task SubscribeToInteractionsAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var interaction in _interactionService.SubscribeInteractionUpdates(cancellationToken).ConfigureAwait(false))
{
await HandleInteractionUpdateAsync(interaction, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
}
}
/// <summary>
/// Checks if there are any steps currently in progress.
/// </summary>
private bool HasStepsInProgress()
{
return _steps.Any(step => step.Value.CompletionState == CompletionState.InProgress);
}
private async Task HandleInteractionUpdateAsync(Interaction interaction, CancellationToken cancellationToken)
{
// Only handle input interaction types
if (interaction.InteractionInfo is not Interaction.InputsInteractionInfo inputsInfo || inputsInfo.Inputs.Count == 0)
{
return;
}
if (interaction.State == Interaction.InteractionState.InProgress)
{
if (HasStepsInProgress())
{
await _interactionService.CompleteInteractionAsync(interaction.InteractionId, (interaction, ServiceProvider) =>
{
// Complete the interaction with an error state
interaction.CompletionTcs.TrySetException(new InvalidOperationException("Cannot prompt interaction while steps are in progress."));
return new InteractionCompletionState
{
Complete = true,
State = "Cannot prompt interaction while steps are in progress."
};
}, cancellationToken).ConfigureAwait(false);
return;
}
var promptInputs = inputsInfo.Inputs.Select(input => new PublishingPromptInput
{
Label = input.Label,
InputType = input.InputType.ToString(),
Required = input.Required,
Options = input.Options,
Value = input.Value,
ValidationErrors = input.ValidationErrors
}).ToList();
var activity = new PublishingActivity
{
Type = PublishingActivityTypes.Prompt,
Data = new PublishingActivityData
{
Id = interaction.InteractionId.ToString(CultureInfo.InvariantCulture),
StatusText = interaction.Message ?? $"{interaction.Title}: ",
CompletionState = ToBackchannelCompletionState(CompletionState.InProgress),
Inputs = promptInputs
}
};
await ActivityItemUpdated.Writer.WriteAsync(activity, cancellationToken).ConfigureAwait(false);
}
}
internal async Task CompleteInteractionAsync(string promptId, string?[]? responses, CancellationToken cancellationToken = default)
{
if (int.TryParse(promptId, CultureInfo.InvariantCulture, out var interactionId))
{
await _interactionService.CompleteInteractionAsync(interactionId,
(interaction, serviceProvider) =>
{
if (interaction.InteractionInfo is Interaction.InputsInteractionInfo inputsInfo)
{
// Set values for all inputs if we have responses
if (responses is not null)
{
for (var i = 0; i < Math.Min(inputsInfo.Inputs.Count, responses.Length); i++)
{
inputsInfo.Inputs[i].SetValue(responses[i] ?? "");
}
}
return new InteractionCompletionState
{
Complete = true,
State = inputsInfo.Inputs
};
}
return new InteractionCompletionState
{
Complete = true,
State = null
};
},
cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
try
{
await _interactionServiceSubscriber.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
}
_cancellationTokenSource.Dispose();
}
internal Channel<PublishingActivity> ActivityItemUpdated { get; } = Channel.CreateUnbounded<PublishingActivity>();
}
|