File: TestTransport.cs
Web Access
Project: src\src\SignalR\clients\csharp\Client\test\UnitTests\Microsoft.AspNetCore.SignalR.Client.Tests.csproj (Microsoft.AspNetCore.SignalR.Client.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
 
namespace Microsoft.AspNetCore.SignalR.Client.Tests;
 
public class TestTransport : ITransport
{
    private readonly Func<Task> _stopHandler;
    private readonly Func<Task> _startHandler;
 
    public TransferFormat? Format { get; }
    public IDuplexPipe Application { get; private set; }
    public Task Receiving { get; private set; }
 
    private IDuplexPipe _transport;
 
    public PipeReader Input => _transport.Input;
 
    public PipeWriter Output => _transport.Output;
 
    public TestTransport(Func<Task> onTransportStop = null, Func<Task> onTransportStart = null, TransferFormat transferFormat = TransferFormat.Text)
    {
        _stopHandler = onTransportStop ?? new Func<Task>(() => Task.CompletedTask);
        _startHandler = onTransportStart ?? new Func<Task>(() => Task.CompletedTask);
        Format = transferFormat;
    }
 
    public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
    {
        if ((Format & transferFormat) == 0)
        {
            throw new InvalidOperationException($"The '{transferFormat}' transfer format is not supported by this transport.");
        }
 
        var options = ClientPipeOptions.DefaultOptions;
        var pair = DuplexPipe.CreateConnectionPair(options, options);
 
        _transport = pair.Transport;
        Application = pair.Application;
        await _startHandler();
 
        // To test canceling the token from the onTransportStart Func.
        cancellationToken.ThrowIfCancellationRequested();
 
        // Start a loop to read from the pipe
        Receiving = ReceiveLoop();
        async Task ReceiveLoop()
        {
            while (true)
            {
                var result = await Application.Input.ReadAsync();
                if (result.IsCompleted)
                {
                    break;
                }
                else if (result.IsCanceled)
                {
                    // This is useful for detecting that the connection tried to gracefully terminate.
                    // If the Receiving task is faulted/canceled, it means StopAsync was the thing that
                    // actually terminated the connection (not ideal, we want the transport pipe to
                    // shut down gracefully)
                    throw new OperationCanceledException();
                }
 
                Application.Input.AdvanceTo(result.Buffer.End);
            }
 
            // Call the transport stop handler
            await _stopHandler();
 
            // Complete our end of the pipe
            Application.Output.Complete();
            Application.Input.Complete();
        }
    }
 
    public Task StopAsync()
    {
        _transport.Output.Complete();
        _transport.Input.Complete();
 
        Application.Input.CancelPendingRead();
        return Receiving;
    }
}