File: TestConnection.cs
Web Access
Project: src\src\SignalR\clients\csharp\Client\test\UnitTests\Microsoft.AspNetCore.SignalR.Client.Tests.csproj (Microsoft.AspNetCore.SignalR.Client.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
 
namespace Microsoft.AspNetCore.SignalR.Client.Tests;
 
internal class TestConnection : ConnectionContext, IConnectionInherentKeepAliveFeature
{
    private readonly bool _autoHandshake;
    private readonly TaskCompletionSource _started = new TaskCompletionSource();
    private readonly TaskCompletionSource _disposed = new TaskCompletionSource();
 
    private int _disposeCount = 0;
    public Task Started => _started.Task;
    public Task Disposed => _disposed.Task;
 
    private readonly Func<Task> _onStart;
    private readonly Func<Task> _onDispose;
    private readonly bool _hasInherentKeepAlive;
 
    public override string ConnectionId { get; set; }
 
    public IDuplexPipe Application { get; }
    public override IDuplexPipe Transport { get; set; }
 
    public override IFeatureCollection Features { get; } = new FeatureCollection();
    public int DisposeCount => _disposeCount;
 
    public override IDictionary<object, object> Items { get; set; } = new ConnectionItems();
 
    bool IConnectionInherentKeepAliveFeature.HasInherentKeepAlive => _hasInherentKeepAlive;
 
    public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoHandshake = true, bool hasInherentKeepAlive = false, PipeOptions pipeOptions = null)
    {
        _autoHandshake = autoHandshake;
        _onStart = onStart ?? (() => Task.CompletedTask);
        _onDispose = onDispose ?? (() => Task.CompletedTask);
        _hasInherentKeepAlive = hasInherentKeepAlive;
 
        var options = pipeOptions ?? new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
 
        var pair = DuplexPipe.CreateConnectionPair(options, options);
        Application = pair.Application;
        Transport = pair.Transport;
 
        Features.Set<IConnectionInherentKeepAliveFeature>(this);
    }
 
    public override ValueTask DisposeAsync() => DisposeCoreAsync();
 
    public async ValueTask<ConnectionContext> StartAsync()
    {
        _started.TrySetResult();
 
        await _onStart();
 
        if (_autoHandshake)
        {
            // We can't await this as it will block StartAsync which will block
            // HubConnection.StartAsync which sends the Handshake in the first place!
            _ = ReadHandshakeAndSendResponseAsync();
        }
 
        return this;
    }
 
    public async Task<string> ReadHandshakeAndSendResponseAsync(int minorVersion = 0)
    {
        var s = await ReadSentTextMessageAsync();
 
        byte[] response;
 
        var output = MemoryBufferWriter.Get();
        try
        {
            HandshakeProtocol.WriteResponseMessage(HandshakeResponseMessage.Empty, output);
            response = output.ToArray();
        }
        finally
        {
            MemoryBufferWriter.Return(output);
        }
 
        await Application.Output.WriteAsync(response);
 
        return s;
    }
 
    public Task ReceiveJsonMessage(object jsonObject)
    {
        var json = JsonConvert.SerializeObject(jsonObject, Formatting.None);
        var bytes = FormatMessageToArray(Encoding.UTF8.GetBytes(json));
 
        return Application.Output.WriteAsync(bytes).AsTask();
    }
 
    public Task ReceiveTextAsync(string rawText)
    {
        return ReceiveBytesAsync(Encoding.UTF8.GetBytes(rawText));
    }
 
    public Task ReceiveBytesAsync(byte[] bytes)
    {
        return Application.Output.WriteAsync(bytes).AsTask();
    }
 
    public async Task<string> ReadSentTextMessageAsync(bool ignorePings = true)
    {
        // Read a single text message from the Application Input pipe
 
        while (true)
        {
            var result = await ReadSentTextMessageAsyncInner();
            if (result == null)
            {
                return null;
            }
 
            var receivedMessageType = (int?)JObject.Parse(result)["type"];
 
            if (ignorePings && receivedMessageType == HubProtocolConstants.PingMessageType)
            {
                continue;
            }
            return result;
        }
    }
 
    private async Task<string> ReadSentTextMessageAsyncInner()
    {
        while (true)
        {
            var result = await Application.Input.ReadAsync();
            var buffer = result.Buffer;
            var consumed = buffer.Start;
 
            try
            {
                if (TextMessageParser.TryParseMessage(ref buffer, out var payload))
                {
                    consumed = buffer.Start;
                    return Encoding.UTF8.GetString(payload.ToArray());
                }
                else if (result.IsCompleted)
                {
                    await Application.Output.CompleteAsync();
                    return null;
                }
            }
            finally
            {
                Application.Input.AdvanceTo(consumed);
            }
        }
    }
 
    public async Task<JObject> ReadSentJsonAsync()
    {
        return JObject.Parse(await ReadSentTextMessageAsync());
    }
 
    public async Task<IList<string>> ReadAllSentMessagesAsync(bool ignorePings = true)
    {
        if (!Disposed.IsCompleted)
        {
            throw new InvalidOperationException("The connection must be stopped before this method can be used.");
        }
 
        var results = new List<string>();
 
        while (true)
        {
            var message = await ReadSentTextMessageAsync(ignorePings);
            if (message == null)
            {
                break;
            }
            results.Add(message);
        }
 
        return results;
    }
 
    public void CompleteFromTransport(Exception ex = null)
    {
        Application.Output.Complete(ex);
    }
 
    private async ValueTask DisposeCoreAsync(Exception ex = null)
    {
        Interlocked.Increment(ref _disposeCount);
        _disposed.TrySetResult();
        await _onDispose();
 
        // Simulate HttpConnection's behavior by Completing the Transport pipe.
        Transport.Input.Complete();
        Transport.Output.Complete();
    }
 
    private byte[] FormatMessageToArray(byte[] message)
    {
        var output = new MemoryStream();
        output.Write(message, 0, message.Length);
        output.WriteByte(TextMessageFormatter.RecordSeparator);
        return output.ToArray();
    }
}