File: src\Shared\ChannelExtensions.cs
Web Access
Project: src\src\Aspire.Dashboard\Aspire.Dashboard.csproj (Aspire.Dashboard)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Runtime.CompilerServices;
using System.Threading.Channels;
 
namespace Aspire;
 
internal static class ChannelExtensions
{
    /// <summary>
    /// Reads batches, grabbing all available messages and returning them as one batch before yielding.
    /// This can give a better downstream experience, as there's less per-item overhead.
    /// </summary>
    /// <remarks>
    /// This sequence adopts the lifetime of <paramref name="channel"/>.
    /// Callers are required to either use a channel that will complete, or to pass a cancellation
    /// token which will also cancel the sequence returned by this method.
    /// </remarks>
    /// <typeparam name="T">The type of items in the channel and returned batch.</typeparam>
    /// <param name="channel">The channel to read values from.</param>
    /// <param name="minReadInterval">The minimum read interval. The enumerable will wait this long before returning the next available result.</param>
    /// <param name="cancellationToken">A token that signals a loss of interest in the operation.</param>
    /// <returns></returns>
    public static async IAsyncEnumerable<IReadOnlyList<T>> GetBatchesAsync<T>(
        this Channel<T> channel,
        TimeSpan? minReadInterval = null,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        DateTime? lastRead = null;
 
        while (!cancellationToken.IsCancellationRequested)
        {
            List<T>? batch = null;
            // Wait until there's something to read, or the channel closes.
            if (await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
            {
                if (minReadInterval != null && lastRead != null)
                {
                    var s = lastRead.Value.Add(minReadInterval.Value) - DateTime.UtcNow;
                    if (s > TimeSpan.Zero)
                    {
                        await Task.Delay(s, cancellationToken).ConfigureAwait(false);
                    }
                }
 
                // Read everything in the channel into a batch.
                while (!cancellationToken.IsCancellationRequested && channel.Reader.TryRead(out var log))
                {
                    batch ??= [];
                    batch.Add(log);
                }
 
                if (!cancellationToken.IsCancellationRequested && batch is not null)
                {
                    lastRead = DateTime.UtcNow;
                    yield return batch;
                }
            }
            else
            {
                // The channel completed, so there'll be no further data.
                break;
            }
        }
    }
}