File: Dcp\ResourceLogSource.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Microsoft.Extensions.Logging;
using System.Threading.Channels;
using Aspire.Hosting.Dcp.Model;
 
namespace Aspire.Hosting.Dcp;
 
using LogEntry = (string Content, bool IsErrorMessage);
using LogEntryList = IReadOnlyList<(string Content, bool IsErrorMessage)>;
 
internal sealed class ResourceLogSource<TResource>(
    ILogger logger,
    IKubernetesService kubernetesService,
    Version? dcpVersion,
    TResource resource) :
    IAsyncEnumerable<LogEntryList>
    where TResource : CustomResource
{
    public async IAsyncEnumerator<LogEntryList> GetAsyncEnumerator(CancellationToken cancellationToken)
    {
        if (!cancellationToken.CanBeCanceled)
        {
            throw new ArgumentException("Cancellation token must be cancellable in order to prevent leaking resources.", nameof(cancellationToken));
        }
 
        var channel = Channel.CreateUnbounded<LogEntry>(new UnboundedChannelOptions
        {
            AllowSynchronousContinuations = false,
            SingleReader = true,
            SingleWriter = false
        });
 
        var streamTasks = new List<Task>();
 
        if (resource is Container && dcpVersion?.CompareTo(DcpVersion.MinimumVersionAspire_8_1) >= 0)
        {
            var startupStderrStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStartupStdErr, follow: true, timestamps: true, cancellationToken).ConfigureAwait(false);
            var startupStdoutStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStartupStdOut, follow: true, timestamps: true, cancellationToken).ConfigureAwait(false);
 
            var startupStdoutStreamTask = Task.Run(() => StreamLogsAsync(startupStdoutStream, isError: false), cancellationToken);
            streamTasks.Add(startupStdoutStreamTask);
 
            var startupStderrStreamTask = Task.Run(() => StreamLogsAsync(startupStderrStream, isError: false), cancellationToken);
            streamTasks.Add(startupStderrStreamTask);
        }
 
        var stdoutStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStdOut, follow: true, timestamps: true, cancellationToken).ConfigureAwait(false);
        var stderrStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStdErr, follow: true, timestamps: true, cancellationToken).ConfigureAwait(false);
 
        var stdoutStreamTask = Task.Run(() => StreamLogsAsync(stdoutStream, isError: false), cancellationToken);
        streamTasks.Add(stdoutStreamTask);
 
        var stderrStreamTask = Task.Run(() => StreamLogsAsync(stderrStream, isError: true), cancellationToken);
        streamTasks.Add(stderrStreamTask);
 
        // End the enumeration when both streams have been read to completion.
        async Task WaitForStreamsToCompleteAsync()
        {
            await Task.WhenAll(streamTasks).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
            channel.Writer.TryComplete();
        }
 
        _ = WaitForStreamsToCompleteAsync();
        
        await foreach (var batch in channel.GetBatchesAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
        {
            yield return batch;
        }
 
        async Task StreamLogsAsync(Stream stream, bool isError)
        {
            try
            {
                using var sr = new StreamReader(stream, leaveOpen: false);
                while (!cancellationToken.IsCancellationRequested)
                {
                    var line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false);
                    if (line is null)
                    {
                        return; // No more data
                    }
 
                    var succeeded = channel.Writer.TryWrite((line, isError));
                    if (!succeeded)
                    {
                        logger.LogWarning("Failed to write log entry to channel. Logs for {Kind} {Name} may be incomplete", resource.Kind, resource.Metadata.Name);
                        channel.Writer.TryComplete();
                        return;
                    }
                }
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                // Expected
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Unexpected error happened when capturing logs for {Kind} {Name}", resource.Kind, resource.Metadata.Name);
                channel.Writer.TryComplete(ex);
            }
        }
    }
}