File: ChannelExtensions.cs
Web Access
Project: src\src\SignalR\common\testassets\Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj (Microsoft.AspNetCore.SignalR.Tests.Utils)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
namespace System.Threading.Channels;
public static class ChannelExtensions
    public static async Task<List<T>> ReadAndCollectAllAsync<T>(this ChannelReader<T> channel, bool suppressExceptions = false)
        var list = new List<T>();
            while (await channel.WaitToReadAsync())
                while (channel.TryRead(out var item))
            // Manifest any error from channel.Completion (which should be completed now)
            if (!suppressExceptions)
                await channel.Completion;
        catch (Exception) when (suppressExceptions)
            // Suppress the exception
        return list;
    public static async Task<List<T>> ReadAtLeastAsync<T>(this ChannelReader<T> reader, int minimumCount, CancellationToken cancellationToken = default)
        if (minimumCount <= 0)
            throw new ArgumentOutOfRangeException(nameof(minimumCount), "minimumCount must be greater than zero.");
        var items = new List<T>();
        while (items.Count < minimumCount)
            while (reader.TryRead(out var item))
                if (items.Count >= minimumCount)
                    return items;
                var readTask = reader.WaitToReadAsync(cancellationToken).AsTask();
                if (!await readTask.ConfigureAwait(false))
                    throw new InvalidOperationException($"Channel ended after writing {items.Count} items.");
            catch (OperationCanceledException)
                throw new OperationCanceledException($"ReadAtLeastAsync canceled with {items.Count} of {minimumCount} items.");
        return items;