File: HubConnectionTests.ConnectionLifecycle.cs
Web Access
Project: src\src\SignalR\clients\csharp\Client\test\UnitTests\Microsoft.AspNetCore.SignalR.Client.Tests.csproj (Microsoft.AspNetCore.SignalR.Client.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.InternalTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Testing;
using Newtonsoft.Json.Linq;
using Xunit;
 
namespace Microsoft.AspNetCore.SignalR.Client.Tests;
 
public partial class HubConnectionTests
{
    public class ConnectionLifecycle : VerifiableLoggedTest
    {
        // This tactic (using names and a dictionary) allows non-serializable data (like a Func) to be used in a theory AND get it to show in the new hierarchical view in Test Explorer as separate tests you can run individually.
        private static readonly IDictionary<string, Func<HubConnection, Task>> MethodsThatRequireActiveConnection = new Dictionary<string, Func<HubConnection, Task>>()
            {
                { nameof(HubConnection.InvokeCoreAsync), (connection) => connection.InvokeAsync("Foo") },
                { nameof(HubConnection.SendCoreAsync), (connection) => connection.SendAsync("Foo") },
                { nameof(HubConnection.StreamAsChannelCoreAsync), (connection) => connection.StreamAsChannelAsync<object>("Foo") },
            };
 
        public static IEnumerable<object[]> MethodsNamesThatRequireActiveConnection => MethodsThatRequireActiveConnection.Keys.Select(k => new object[] { k });
 
        [Fact]
        public async Task StartAsyncStartsTheUnderlyingConnection()
        {
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
                Assert.Equal(HubConnectionState.Connected, connection.State);
            });
        }
 
        [Fact]
        public async Task StartAsyncThrowsIfPreviousStartIsAlreadyStarting()
        {
            // Set up StartAsync to wait on the syncPoint when starting
            var testConnection = new TestConnection(onStart: SyncPoint.Create(out var syncPoint));
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                var firstStart = connection.StartAsync();
                Assert.False(firstStart.IsCompleted);
 
                // Wait for us to be in IConnectionFactory.ConnectAsync
                await syncPoint.WaitForSyncPoint().DefaultTimeout();
 
                // Try starting again
                var secondStart = connection.StartAsync();
                Assert.False(secondStart.IsCompleted);
 
                // Release the sync point
                syncPoint.Continue();
 
                // The first start should finish fine, but the second throws an InvalidOperationException.
                await firstStart.DefaultTimeout();
                await Assert.ThrowsAsync<InvalidOperationException>(() => secondStart).DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task StartingAfterStopCreatesANewConnection()
        {
            // Set up StartAsync to wait on the syncPoint when starting
            var createCount = 0;
            ValueTask<ConnectionContext> ConnectionFactory(EndPoint endPoint)
            {
                createCount += 1;
                return new TestConnection().StartAsync();
            }
            var builder = new HubConnectionBuilder().WithUrl("http://example.com");
            var delegateConnectionFactory = new DelegateConnectionFactory(ConnectionFactory);
            builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
 
            await AsyncUsing(builder.Build(), async connection =>
            {
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.Equal(1, createCount);
                Assert.Equal(HubConnectionState.Connected, connection.State);
 
                await connection.StopAsync().DefaultTimeout();
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.Equal(2, createCount);
                Assert.Equal(HubConnectionState.Connected, connection.State);
            });
        }
 
        [Fact]
        public async Task StartingDuringStopCreatesANewConnection()
        {
            // Set up StartAsync to wait on the syncPoint when starting
            var createCount = 0;
            var onDisposeForFirstConnection = SyncPoint.Create(out var syncPoint);
            ValueTask<ConnectionContext> ConnectionFactory(EndPoint endPoint)
            {
                createCount += 1;
                return new TestConnection(onDispose: createCount == 1 ? onDisposeForFirstConnection : null).StartAsync();
            }
 
            var builder = new HubConnectionBuilder().WithUrl("http://example.com");
            var delegateConnectionFactory = new DelegateConnectionFactory(ConnectionFactory);
            builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
 
            await AsyncUsing(builder.Build(), async connection =>
            {
                await connection.StartAsync().DefaultTimeout();
                Assert.Equal(1, createCount);
 
                var stopTask = connection.StopAsync();
 
                // Wait to hit DisposeAsync on TestConnection (which should be after StopAsync has cleared the connection state)
                await syncPoint.WaitForSyncPoint().DefaultTimeout();
 
                // We should not yet be able to start now because StopAsync hasn't completed
                Assert.False(stopTask.IsCompleted);
                var startTask = connection.StartAsync();
                Assert.False(stopTask.IsCompleted);
 
                // When we release the sync point, the StopAsync task will finish
                syncPoint.Continue();
                await stopTask.DefaultTimeout();
 
                // Which will then allow StartAsync to finish.
                await startTask.DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task StartAsyncWithFailedHandshakeCanBeStopped()
        {
            var handshakeConnectionErrorLogged = false;
 
            bool ExpectedErrors(WriteContext writeContext)
            {
                if (writeContext.LoggerName == typeof(HubConnection).FullName)
                {
                    if (writeContext.EventId.Name == "ErrorReceivingHandshakeResponse")
                    {
                        handshakeConnectionErrorLogged = true;
                        return true;
                    }
 
                    return writeContext.EventId.Name == "ErrorStartingConnection";
                }
 
                return false;
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var testConnection = new TestConnection(autoHandshake: false);
                await AsyncUsing(CreateHubConnection(testConnection, loggerFactory: LoggerFactory), async connection =>
                {
                    testConnection.Transport.Input.Complete();
                    try
                    {
                        await connection.StartAsync().DefaultTimeout();
                    }
                    catch
                    { }
 
                    await connection.StopAsync().DefaultTimeout();
                    Assert.True(testConnection.Started.IsCompleted);
                });
            }
 
            Assert.True(handshakeConnectionErrorLogged, "The connnection error during the handshake wasn't logged.");
        }
 
        [Theory]
        [MemberData(nameof(MethodsNamesThatRequireActiveConnection))]
        public async Task MethodsThatRequireStartedConnectionFailIfConnectionNotYetStarted(string name)
        {
            var method = MethodsThatRequireActiveConnection[name];
 
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => method(connection)).DefaultTimeout();
                Assert.Equal($"The '{name}' method cannot be called if the connection is not active", ex.Message);
            });
        }
 
        [Theory]
        [MemberData(nameof(MethodsNamesThatRequireActiveConnection))]
        public async Task MethodsThatRequireStartedConnectionWaitForStartIfConnectionIsCurrentlyStarting(string name)
        {
            var method = MethodsThatRequireActiveConnection[name];
 
            // Set up StartAsync to wait on the syncPoint when starting
            var testConnection = new TestConnection(onStart: SyncPoint.Create(out var syncPoint));
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                // Start, and wait for the sync point to be hit
                var startTask = connection.StartAsync();
                Assert.False(startTask.IsCompleted);
                await syncPoint.WaitForSyncPoint().DefaultTimeout();
 
                // Run the method, but it will be waiting for the lock
                var targetTask = method(connection);
 
                // Release the SyncPoint
                syncPoint.Continue();
 
                // Wait for start to finish
                await startTask.DefaultTimeout();
 
                // We need some special logic to ensure InvokeAsync completes.
                if (string.Equals(name, nameof(HubConnection.InvokeCoreAsync)))
                {
                    await ForceLastInvocationToComplete(testConnection).DefaultTimeout();
                }
 
                // Wait for the method to complete.
                await targetTask.DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task StatusIsNotConnectedUntilStartAsyncIsFinished()
        {
            // Set up StartAsync to wait on the syncPoint when starting
            var testConnection = new TestConnection(onStart: SyncPoint.Create(out var syncPoint));
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                // Start, and wait for the sync point to be hit
                var startTask = connection.StartAsync();
                Assert.False(startTask.IsCompleted);
                await syncPoint.WaitForSyncPoint().DefaultTimeout();
 
                Assert.Equal(HubConnectionState.Connecting, connection.State);
 
                // Release the SyncPoint
                syncPoint.Continue();
 
                // Wait for start to finish
                await startTask.DefaultTimeout();
 
                Assert.Equal(HubConnectionState.Connected, connection.State);
            });
        }
 
        [Fact]
        public async Task StatusIsDisconnectedInCloseEvent()
        {
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                var closed = new TaskCompletionSource();
                connection.Closed += exception =>
                {
                    closed.TrySetResult();
                    Assert.Equal(HubConnectionState.Disconnected, connection.State);
                    return Task.CompletedTask;
                };
 
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
                Assert.Equal(HubConnectionState.Connected, connection.State);
 
                await connection.StopAsync().DefaultTimeout();
                await testConnection.Disposed.DefaultTimeout();
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await closed.Task.DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task StopAsyncStopsConnection()
        {
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
 
                await connection.StopAsync().DefaultTimeout();
                await testConnection.Disposed.DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task StopAsyncNoOpsIfConnectionNotYetStarted()
        {
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                await connection.StopAsync().DefaultTimeout();
                Assert.False(testConnection.Disposed.IsCompleted);
            });
        }
 
        [Fact]
        public async Task StopAsyncNoOpsIfConnectionAlreadyStopped()
        {
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
                Assert.Equal(HubConnectionState.Connected, connection.State);
 
                await connection.StopAsync().DefaultTimeout();
                await testConnection.Disposed.DefaultTimeout();
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StopAsync().DefaultTimeout();
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
            });
        }
 
        [Fact]
        public async Task StopAsyncOnInactiveConnectionDoesNotAffectNextStartAsync()
        {
            // Regression test:
            // If there wasn't an active underlying connection, StopAsync would leave a CTS canceled which would cause the next StartAsync to fail
            var testConnection = new TestConnection();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StopAsync().DefaultTimeout();
                Assert.False(testConnection.Disposed.IsCompleted);
                Assert.Equal(HubConnectionState.Disconnected, connection.State);
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
                Assert.Equal(HubConnectionState.Connected, connection.State);
            });
        }
 
        [Fact]
        public async Task CompletingTheTransportSideMarksConnectionAsClosed()
        {
            var testConnection = new TestConnection();
            var closed = new TaskCompletionSource();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                connection.Closed += (e) =>
                {
                    closed.TrySetResult();
                    return Task.CompletedTask;
                };
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
 
                // Complete the transport side and wait for the connection to close
                testConnection.CompleteFromTransport();
                await closed.Task.DefaultTimeout();
 
                // We should be stopped now
                var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo")).DefaultTimeout();
                Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
            });
        }
 
        [Fact]
        public async Task TransportCompletionWhileShuttingDownIsNoOp()
        {
            var testConnection = new TestConnection();
            var connectionClosed = new TaskCompletionSource();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                connection.Closed += (e) =>
                {
                    connectionClosed.TrySetResult();
                    return Task.CompletedTask;
                };
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
 
                // Start shutting down and complete the transport side
                var stopTask = connection.StopAsync();
                testConnection.CompleteFromTransport();
 
                // Wait for the connection to close.
                await testConnection.Transport.Input.CompleteAsync();
 
                // The stop should be completed.
                await stopTask.DefaultTimeout();
 
                // The HubConnection should now be closed.
                await connectionClosed.Task.DefaultTimeout();
 
                // We should be stopped now
                var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo")).DefaultTimeout();
                Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
 
                await testConnection.Disposed.DefaultTimeout();
 
                Assert.Equal(1, testConnection.DisposeCount);
            });
        }
 
        [Fact]
        public async Task StopAsyncDuringUnderlyingConnectionCloseWaitsAndNoOps()
        {
            var testConnection = new TestConnection();
            var connectionClosed = new TaskCompletionSource();
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                connection.Closed += (e) =>
                {
                    connectionClosed.TrySetResult();
                    return Task.CompletedTask;
                };
 
                await connection.StartAsync().DefaultTimeout();
                Assert.True(testConnection.Started.IsCompleted);
 
                // Complete the transport side and wait for the connection to close
                testConnection.CompleteFromTransport();
 
                // Start stopping manually (these can't be synchronized by a Sync Point because the transport is disposed outside the lock)
                var stopTask = connection.StopAsync();
 
                await testConnection.Disposed.DefaultTimeout();
 
                // Wait for the stop task to complete and the closed event to fire
                await stopTask.DefaultTimeout();
                await connectionClosed.Task.DefaultTimeout();
 
                // We should be stopped now
                var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo")).DefaultTimeout();
                Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
            });
        }
 
        [Theory]
        [MemberData(nameof(MethodsNamesThatRequireActiveConnection))]
        public async Task MethodsThatRequireActiveConnectionWaitForStopAndFailIfConnectionIsCurrentlyStopping(string methodName)
        {
            var method = MethodsThatRequireActiveConnection[methodName];
 
            // Set up StartAsync to wait on the syncPoint when starting
            var testConnection = new TestConnection(onDispose: SyncPoint.Create(out var syncPoint));
            await AsyncUsing(CreateHubConnection(testConnection), async connection =>
            {
                await connection.StartAsync().DefaultTimeout();
 
                // Stop and invoke the method. These two aren't synchronizable via a Sync Point any more because the transport is disposed
                // outside the lock :(
                var disposeTask = connection.StopAsync();
 
                // Wait to hit DisposeAsync on TestConnection (which should be after StopAsync has cleared the connection state)
                await syncPoint.WaitForSyncPoint().DefaultTimeout();
 
                var targetTask = method(connection);
 
                // Release the sync point
                syncPoint.Continue();
 
                // Wait for the method to complete, with an expected error.
                var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => targetTask).DefaultTimeout();
                Assert.Equal($"The '{methodName}' method cannot be called if the connection is not active", ex.Message);
 
                await disposeTask.DefaultTimeout();
            });
        }
 
        [Fact]
        public async Task ClientTimesoutWhenHandshakeResponseTakesTooLong()
        {
            var handshakeTimeoutLogged = false;
 
            bool ExpectedErrors(WriteContext writeContext)
            {
                if (writeContext.LoggerName == typeof(HubConnection).FullName)
                {
                    if (writeContext.EventId.Name == "ErrorHandshakeTimedOut")
                    {
                        handshakeTimeoutLogged = true;
                        return true;
                    }
 
                    return writeContext.EventId.Name == "ErrorStartingConnection";
                }
 
                return false;
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var connection = new TestConnection(autoHandshake: false);
                var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
                try
                {
                    hubConnection.HandshakeTimeout = TimeSpan.FromMilliseconds(1);
 
                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => hubConnection.StartAsync()).DefaultTimeout();
                    Assert.Equal(HubConnectionState.Disconnected, hubConnection.State);
                }
                finally
                {
                    await hubConnection.DisposeAsync().DefaultTimeout();
                    await connection.DisposeAsync().DefaultTimeout();
                }
            }
 
            Assert.True(handshakeTimeoutLogged, "The handshake timeout wasn't logged.");
        }
 
        [Fact]
        public async Task StartAsyncWithTriggeredCancellationTokenIsCanceled()
        {
            using (StartVerifiableLog())
            {
                var onStartCalled = false;
                var connection = new TestConnection(onStart: () =>
                {
                    onStartCalled = true;
                    return Task.CompletedTask;
                });
                var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
                try
                {
                    await Assert.ThrowsAsync<TaskCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true))).DefaultTimeout();
                    Assert.False(onStartCalled);
                }
                finally
                {
                    await hubConnection.DisposeAsync().DefaultTimeout();
                    await connection.DisposeAsync().DefaultTimeout();
                }
            }
        }
 
        [Fact]
        public async Task StartAsyncCanTriggerCancellationTokenToCancelHandshake()
        {
            var handshakeCancellationLogged = false;
 
            bool ExpectedErrors(WriteContext writeContext)
            {
                if (writeContext.LoggerName == typeof(HubConnection).FullName)
                {
                    if (writeContext.EventId.Name == "ErrorHandshakeCanceled")
                    {
                        handshakeCancellationLogged = true;
                        return true;
                    }
 
                    return writeContext.EventId.Name == "ErrorStartingConnection";
                }
 
                return false;
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var cts = new CancellationTokenSource();
                TestConnection connection = null;
 
                connection = new TestConnection(autoHandshake: false);
 
                var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
                // We want to make sure the cancellation is because of the token passed to StartAsync
                hubConnection.HandshakeTimeout = Timeout.InfiniteTimeSpan;
 
                try
                {
                    var startTask = hubConnection.StartAsync(cts.Token);
 
                    await connection.ReadSentTextMessageAsync().DefaultTimeout();
                    cts.Cancel();
 
                    // We aren't worried about the exact message and it's localized so asserting it is non-trivial.
                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => startTask).DefaultTimeout();
                }
                finally
                {
                    await hubConnection.DisposeAsync().DefaultTimeout();
                    await connection.DisposeAsync().DefaultTimeout();
                }
            }
 
            Assert.True(handshakeCancellationLogged, "The handshake cancellation wasn't logged.");
        }
 
        [Fact]
        public async Task HubConnectionClosesWithErrorIfTerminatedWithPartialMessage()
        {
            var builder = new HubConnectionBuilder().WithUrl("http://example.com");
            var innerConnection = new TestConnection();
 
            var delegateConnectionFactory = new DelegateConnectionFactory(
                endPoint => innerConnection.StartAsync());
            builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
 
            var hubConnection = builder.Build();
            var closedEventTcs = new TaskCompletionSource<Exception>();
            hubConnection.Closed += e =>
            {
                closedEventTcs.SetResult(e);
                return Task.CompletedTask;
            };
 
            await hubConnection.StartAsync().DefaultTimeout();
 
            await innerConnection.Application.Output.WriteAsync(Encoding.UTF8.GetBytes(new[] { '{' })).DefaultTimeout();
            innerConnection.Application.Output.Complete();
 
            var exception = await closedEventTcs.Task.DefaultTimeout();
            Assert.Equal("Connection terminated while reading a message.", exception.Message);
        }
 
        private static async Task ForceLastInvocationToComplete(TestConnection testConnection)
        {
            // We need to "complete" the invocation
            var message = await testConnection.ReadSentTextMessageAsync();
            var json = JObject.Parse(message); // Gotta remove the record separator.
            await testConnection.ReceiveJsonMessage(new
            {
                type = HubProtocolConstants.CompletionMessageType,
                invocationId = json["invocationId"],
            });
        }
 
        // A helper that we wouldn't want to use in product code, but is fine for testing until IAsyncDisposable arrives :)
        private static async Task AsyncUsing(HubConnection connection, Func<HubConnection, Task> action)
        {
            try
            {
                // Using OrTimeout here will hide any timeout issues in the test :(.
                await action(connection);
            }
            finally
            {
                // Dispose isn't under test here, so fire and forget so that errors/timeouts here don't cause
                // test errors that mask the real errors.
                _ = connection.DisposeAsync();
            }
        }
    }
}