File: ObservableExtensions.cs
Web Access
Project: src\src\SignalR\samples\SignalRSamples\SignalRSamples.csproj (SignalRSamples)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Reactive.Linq;
using System.Threading.Channels;
 
namespace SignalRSamples;
 
public static class ObservableExtensions
{
    public static ChannelReader<T> AsChannelReader<T>(
        this IObservable<T> observable,
        CancellationToken connectionAborted,
        int? maxBufferSize = null
    )
    {
        // This sample shows adapting an observable to a ChannelReader without
        // back pressure, if the connection is slower than the producer, memory will
        // start to increase.
 
        // If the channel is bounded, TryWrite will return false and effectively
        // drop items.
 
        // The other alternative is to use a bounded channel, and when the limit is reached
        // block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended and isn't shown here.
        var channel = maxBufferSize != null ? Channel.CreateBounded<T>(maxBufferSize.Value) : Channel.CreateUnbounded<T>();
 
        var disposable = observable.Subscribe(
                            value => channel.Writer.TryWrite(value),
                            error => channel.Writer.TryComplete(error),
                            () => channel.Writer.TryComplete());
        var abortRegistration = connectionAborted.Register(() => channel.Writer.TryComplete());
 
        // Complete the subscription on the reader completing
        channel.Reader.Completion.ContinueWith(task =>
        {
            disposable.Dispose();
            abortRegistration.Dispose();
        });
 
        return channel.Reader;
    }
}