File: HubConnectionTests.Reconnect.cs
Web Access
Project: src\src\SignalR\clients\csharp\Client\test\UnitTests\Microsoft.AspNetCore.SignalR.Client.Tests.csproj (Microsoft.AspNetCore.SignalR.Client.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.InternalTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Moq;
using Xunit;
 
namespace Microsoft.AspNetCore.SignalR.Client.Tests;
 
public partial class HubConnectionTests
{
    public class Reconnect : VerifiableLoggedTest
    {
        [Fact]
        public async Task IsNotEnabledByDefault()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ShutdownWithError" ||
                        writeContext.EventId.Name == "ServerDisconnectedWithError");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var exception = new Exception();
 
                var testConnection = new TestConnection();
                await using var hubConnection = CreateHubConnection(testConnection, loggerFactory: LoggerFactory);
 
                var reconnectingCalled = false;
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCalled = true;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                testConnection.CompleteFromTransport(exception);
 
                Assert.Same(exception, await closedErrorTcs.Task.DefaultTimeout());
                Assert.False(reconnectingCalled);
            }
        }
 
        [Fact]
        public async Task CanBeOptedInto()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError");
            }
 
            var failReconnectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = default(ReconnectingConnectionFactory);
                var startCallCount = 0;
                var originalConnectionId = "originalConnectionId";
                var reconnectedConnectionId = "reconnectedConnectionId";
 
                async Task OnTestConnectionStart()
                {
                    startCallCount++;
 
                    // Only fail the first reconnect attempt.
                    if (startCallCount == 2)
                    {
                        await failReconnectTcs.Task;
                    }
 
                    var testConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
 
                    // Change the connection id before reconnecting.
                    if (startCallCount == 3)
                    {
                        testConnection.ConnectionId = reconnectedConnectionId;
                    }
                    else
                    {
                        testConnection.ConnectionId = originalConnectionId;
                    }
                }
 
                testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(OnTestConnectionStart));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    reconnectedConnectionIdTcs.SetResult(connectionId);
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                Assert.Same(originalConnectionId, hubConnection.ConnectionId);
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                var reconnectException = new Exception();
                failReconnectTcs.SetException(reconnectException);
 
                Assert.Same(reconnectedConnectionId, await reconnectedConnectionIdTcs.Task.DefaultTimeout());
 
                Assert.Equal(2, retryContexts.Count);
                Assert.Same(reconnectException, retryContexts[1].RetryReason);
                Assert.Equal(1, retryContexts[1].PreviousRetryCount);
                Assert.True(TimeSpan.Zero <= retryContexts[1].ElapsedTime);
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.Null(closeError);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task StopsIfTheReconnectPolicyReturnsNull()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError");
            }
 
            var failReconnectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var startCallCount = 0;
 
                Task OnTestConnectionStart()
                {
                    startCallCount++;
 
                    // Fail the first reconnect attempts.
                    if (startCallCount > 1)
                    {
                        return failReconnectTcs.Task;
                    }
 
                    return Task.CompletedTask;
                }
 
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(OnTestConnectionStart));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return context.PreviousRetryCount == 0 ? TimeSpan.Zero : (TimeSpan?)null;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                var reconnectException = new Exception();
                failReconnectTcs.SetException(reconnectException);
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.IsType<OperationCanceledException>(closeError);
 
                Assert.Equal(2, retryContexts.Count);
                Assert.Same(reconnectException, retryContexts[1].RetryReason);
                Assert.Equal(1, retryContexts[1].PreviousRetryCount);
                Assert.True(TimeSpan.Zero <= retryContexts[1].ElapsedTime);
 
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(0, reconnectedCount);
            }
        }
 
        [Fact]
        [LogLevel(LogLevel.Trace)]
        public async Task HasCorrectRetryNumberAfterRetriesExhausted()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError");
            }
 
            var failReconnectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
            using (var logCollector = StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var startCallCount = 0;
 
                Task OnTestConnectionStart()
                {
                    startCallCount++;
 
                    // Fail the first reconnect attempts.
                    if (startCallCount > 1)
                    {
                        return failReconnectTcs.Task;
                    }
 
                    return Task.CompletedTask;
                }
 
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(OnTestConnectionStart));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return context.PreviousRetryCount == 0 ? TimeSpan.Zero : (TimeSpan?)null;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
 
                var reconnectException = new Exception();
                failReconnectTcs.SetException(reconnectException);
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.IsType<OperationCanceledException>(closeError);
 
                Assert.Contains($"after {reconnectingCount} failed attempts", closeError.Message);
 
                var logs = logCollector.GetLogs();
                var attemptsLog = logs.SingleOrDefault(r => r.Write.EventId.Name == "ReconnectAttemptsExhausted");
                Assert.NotNull(attemptsLog);
                Assert.Contains($"after {reconnectingCount} failed attempts", attemptsLog.Write.Message);
                Assert.Equal(LogLevel.Information, attemptsLog.Write.LogLevel);
 
                var waitingLog = logs.SingleOrDefault(r => r.Write.EventId.Name == "AwaitingReconnectRetryDelay");
                Assert.NotNull(waitingLog);
                Assert.Contains($"Reconnect attempt number 1 will start in ", waitingLog.Write.Message);
                Assert.Equal(LogLevel.Trace, waitingLog.Write.LogLevel);
 
                Assert.Equal(0, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task CanHappenMultipleTimes()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory();
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    reconnectedConnectionIdTcs.SetResult(connectionId);
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                await reconnectedConnectionIdTcs.Task.DefaultTimeout();
 
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
                Assert.Equal(TaskStatus.WaitingForActivation, closedErrorTcs.Task.Status);
 
                reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                var secondException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(secondException);
 
                Assert.Same(secondException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Equal(2, retryContexts.Count);
                Assert.Same(secondException, retryContexts[1].RetryReason);
                Assert.Equal(0, retryContexts[1].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[1].ElapsedTime);
 
                await reconnectedConnectionIdTcs.Task.DefaultTimeout();
 
                Assert.Equal(2, reconnectingCount);
                Assert.Equal(2, reconnectedCount);
                Assert.Equal(TaskStatus.WaitingForActivation, closedErrorTcs.Task.Status);
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.Null(closeError);
                Assert.Equal(2, reconnectingCount);
                Assert.Equal(2, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task CanBeInducedByCloseMessageWithAllowReconnectSet()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ReceivedCloseWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = default(ReconnectingConnectionFactory);
 
                testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    reconnectedConnectionIdTcs.SetResult(connectionId);
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentConnection.ReceiveJsonMessage(new
                {
                    type = HubProtocolConstants.CloseMessageType,
                    error = "Error!",
                    allowReconnect = true,
                });
 
                var reconnectingException = await reconnectingErrorTcs.Task.DefaultTimeout();
                var expectedMessage = "The server closed the connection with the following error: Error!";
 
                Assert.Equal(expectedMessage, reconnectingException.Message);
                Assert.Single(retryContexts);
                Assert.Equal(expectedMessage, retryContexts[0].RetryReason.Message);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                await reconnectedConnectionIdTcs.Task.DefaultTimeout();
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.Null(closeError);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task CannotBeInducedByCloseMessageWithAllowReconnectOmitted()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ReceivedCloseWithError" ||
                        writeContext.EventId.Name == "ShutdownWithError");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = default(ReconnectingConnectionFactory);
 
                testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var reconnectingCount = 0;
                var nextRetryDelayCallCount = 0;
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    nextRetryDelayCallCount++;
                    return TimeSpan.Zero;
                });
 
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentConnection.ReceiveJsonMessage(new
                {
                    type = HubProtocolConstants.CloseMessageType,
                    error = "Error!",
                });
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
 
                Assert.Equal("The server closed the connection with the following error: Error!", closeError.Message);
                Assert.Equal(0, nextRetryDelayCallCount);
                Assert.Equal(0, reconnectingCount);
            }
        }
 
        [Fact]
        public async Task EventsNotFiredIfFirstRetryDelayIsNull()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       writeContext.EventId.Name == "ServerDisconnectedWithError";
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory();
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<TimeSpan?>(null);
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                await closedErrorTcs.Task.DefaultTimeout();
 
                Assert.Equal(0, reconnectingCount);
                Assert.Equal(0, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task DoesNotStartIfConnectionIsLostDuringInitialHandshake()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ErrorReceivingHandshakeResponse" ||
                        writeContext.EventId.Name == "ErrorStartingConnection");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(autoHandshake: false));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<TimeSpan?>(null);
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var closedCount = 0;
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedCount++;
                    return Task.CompletedTask;
                };
 
                var startTask = hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await Assert.ThrowsAsync<Exception>(() => startTask).DefaultTimeout());
                Assert.Equal(HubConnectionState.Disconnected, hubConnection.State);
                Assert.Equal(0, reconnectingCount);
                Assert.Equal(0, reconnectedCount);
                Assert.Equal(0, closedCount);
            }
        }
 
        [Fact]
        public async Task ContinuesIfConnectionLostDuringReconnectHandshake()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError" ||
                        writeContext.EventId.Name == "ErrorReceivingHandshakeResponse" ||
                        writeContext.EventId.Name == "ErrorStartingConnection");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(autoHandshake: false));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var secondRetryDelayTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
 
                    if (retryContexts.Count == 2)
                    {
                        secondRetryDelayTcs.SetResult();
                    }
 
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    reconnectedConnectionIdTcs.SetResult(connectionId);
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                var startTask = hubConnection.StartAsync();
 
                // Complete handshake
                var currentTestConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentTestConnection.ReadHandshakeAndSendResponseAsync().DefaultTimeout();
 
                await startTask.DefaultTimeout();
 
                var firstException = new Exception();
                currentTestConnection.CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                var secondException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(secondException);
 
                await secondRetryDelayTcs.Task.DefaultTimeout();
 
                Assert.Equal(2, retryContexts.Count);
                Assert.Same(secondException, retryContexts[1].RetryReason);
                Assert.Equal(1, retryContexts[1].PreviousRetryCount);
                Assert.True(TimeSpan.Zero <= retryContexts[0].ElapsedTime);
 
                // Complete handshake
                currentTestConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentTestConnection.ReadHandshakeAndSendResponseAsync().DefaultTimeout();
                await reconnectedConnectionIdTcs.Task.DefaultTimeout();
 
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
                Assert.Equal(TaskStatus.WaitingForActivation, closedErrorTcs.Task.Status);
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.Null(closeError);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task ContinuesIfInvalidHandshakeResponse()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError" ||
                        writeContext.EventId.Name == "ErrorReceivingHandshakeResponse" ||
                        writeContext.EventId.Name == "HandshakeServerError" ||
                        writeContext.EventId.Name == "ErrorStartingConnection");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(autoHandshake: false));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var secondRetryDelayTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
 
                    if (retryContexts.Count == 2)
                    {
                        secondRetryDelayTcs.SetResult();
                    }
 
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    reconnectedConnectionIdTcs.SetResult(connectionId);
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                var startTask = hubConnection.StartAsync();
 
                // Complete handshake
                var currentTestConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentTestConnection.ReadHandshakeAndSendResponseAsync().DefaultTimeout();
 
                await startTask.DefaultTimeout();
 
                var firstException = new Exception();
                currentTestConnection.CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                // Respond to handshake with error.
                currentTestConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentTestConnection.ReadSentTextMessageAsync().DefaultTimeout();
 
                var output = MemoryBufferWriter.Get();
                try
                {
                    HandshakeProtocol.WriteResponseMessage(new HandshakeResponseMessage("Error!"), output);
                    await currentTestConnection.Application.Output.WriteAsync(output.ToArray()).DefaultTimeout();
                }
                finally
                {
                    MemoryBufferWriter.Return(output);
                }
 
                await secondRetryDelayTcs.Task.DefaultTimeout();
 
                Assert.Equal(2, retryContexts.Count);
                Assert.IsType<HubException>(retryContexts[1].RetryReason);
                Assert.Equal(1, retryContexts[1].PreviousRetryCount);
                Assert.True(TimeSpan.Zero <= retryContexts[0].ElapsedTime);
 
                // Complete handshake
 
                currentTestConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
                await currentTestConnection.ReadHandshakeAndSendResponseAsync().DefaultTimeout();
                await reconnectedConnectionIdTcs.Task.DefaultTimeout();
 
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
                Assert.Equal(TaskStatus.WaitingForActivation, closedErrorTcs.Task.Status);
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                var closeError = await closedErrorTcs.Task.DefaultTimeout();
                Assert.Null(closeError);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(1, reconnectedCount);
            }
        }
 
        [Fact]
        public async Task CanBeStoppedWhileRestartingUnderlyingConnection()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError" ||
                        writeContext.EventId.Name == "ErrorHandshakeCanceled" ||
                        writeContext.EventId.Name == "ErrorStartingConnection");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var connectionStartTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
                async Task OnTestConnectionStart()
                {
                    try
                    {
                        await connectionStartTcs.Task;
                    }
                    finally
                    {
                        connectionStartTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
                    }
                }
 
                var testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection(OnTestConnectionStart));
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    return TimeSpan.Zero;
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                // Allow the first connection to start successfully.
                connectionStartTcs.SetResult();
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                var secondException = new Exception();
                var stopTask = hubConnection.StopAsync();
                connectionStartTcs.SetResult();
 
                Assert.IsType<OperationCanceledException>(await closedErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(0, reconnectedCount);
                await stopTask.DefaultTimeout();
            }
        }
 
        [Fact]
        public async Task CanBeStoppedDuringRetryDelay()
        {
            bool ExpectedErrors(WriteContext writeContext)
            {
                return writeContext.LoggerName == typeof(HubConnection).FullName &&
                       (writeContext.EventId.Name == "ServerDisconnectedWithError" ||
                        writeContext.EventId.Name == "ReconnectingWithError" ||
                        writeContext.EventId.Name == "ErrorReceivingHandshakeResponse" ||
                        writeContext.EventId.Name == "ErrorStartingConnection");
            }
 
            using (StartVerifiableLog(ExpectedErrors))
            {
                var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
                var testConnectionFactory = new ReconnectingConnectionFactory();
                builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
 
                var retryContexts = new List<RetryContext>();
                var mockReconnectPolicy = new Mock<IRetryPolicy>();
                mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
                {
                    retryContexts.Add(context);
                    // Hopefully this test never takes over a minute.
                    return TimeSpan.FromMinutes(1);
                });
                builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
 
                await using var hubConnection = builder.Build();
                var reconnectingCount = 0;
                var reconnectedCount = 0;
                var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
                var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                hubConnection.Reconnecting += error =>
                {
                    reconnectingCount++;
                    reconnectingErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                hubConnection.Reconnected += connectionId =>
                {
                    reconnectedCount++;
                    return Task.CompletedTask;
                };
 
                hubConnection.Closed += error =>
                {
                    closedErrorTcs.SetResult(error);
                    return Task.CompletedTask;
                };
 
                // Allow the first connection to start successfully.
                await hubConnection.StartAsync().DefaultTimeout();
 
                var firstException = new Exception();
                (await testConnectionFactory.GetNextOrCurrentTestConnection()).CompleteFromTransport(firstException);
 
                Assert.Same(firstException, await reconnectingErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Same(firstException, retryContexts[0].RetryReason);
                Assert.Equal(0, retryContexts[0].PreviousRetryCount);
                Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
 
                await hubConnection.StopAsync().DefaultTimeout();
 
                Assert.IsType<OperationCanceledException>(await closedErrorTcs.Task.DefaultTimeout());
                Assert.Single(retryContexts);
                Assert.Equal(1, reconnectingCount);
                Assert.Equal(0, reconnectedCount);
            }
        }
 
        private class ReconnectingConnectionFactory : IConnectionFactory
        {
            public readonly Func<TestConnection> _testConnectionFactory;
            public TaskCompletionSource<TestConnection> _testConnectionTcs = new TaskCompletionSource<TestConnection>(TaskCreationOptions.RunContinuationsAsynchronously);
 
            public ReconnectingConnectionFactory()
                : this(() => new TestConnection())
            {
            }
 
            public ReconnectingConnectionFactory(Func<TestConnection> testConnectionFactory)
            {
                _testConnectionFactory = testConnectionFactory;
            }
 
            public Task<TestConnection> GetNextOrCurrentTestConnection()
            {
                return _testConnectionTcs.Task;
            }
 
            public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endPoint, CancellationToken cancellationToken = default)
            {
                var testConnection = _testConnectionFactory();
 
                _testConnectionTcs.SetResult(testConnection);
 
                try
                {
                    return new DisposeInterceptingConnectionContextDecorator(await testConnection.StartAsync(), this);
                }
                catch
                {
                    _testConnectionTcs = new TaskCompletionSource<TestConnection>(TaskCreationOptions.RunContinuationsAsynchronously);
                    throw;
                }
            }
 
            public async Task DisposeAsync(ConnectionContext connection)
            {
                var disposingTestConnection = await _testConnectionTcs.Task;
 
                _testConnectionTcs = new TaskCompletionSource<TestConnection>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                await disposingTestConnection.DisposeAsync();
            }
        }
 
        private class DisposeInterceptingConnectionContextDecorator : ConnectionContext
        {
            private readonly ConnectionContext _inner;
            private readonly ReconnectingConnectionFactory _reconnectingConnectionFactory;
 
            public DisposeInterceptingConnectionContextDecorator(ConnectionContext inner, ReconnectingConnectionFactory reconnectingConnectionFactory)
            {
                _inner = inner;
                _reconnectingConnectionFactory = reconnectingConnectionFactory;
            }
 
            public override string ConnectionId { get => _inner.ConnectionId; set => _inner.ConnectionId = value; }
            public override IFeatureCollection Features => _inner.Features;
            public override IDictionary<object, object> Items { get => _inner.Items; set => _inner.Items = value; }
            public override IDuplexPipe Transport { get => _inner.Transport; set => _inner.Transport = value; }
            public override CancellationToken ConnectionClosed { get => _inner.ConnectionClosed; set => _inner.ConnectionClosed = value; }
            public override EndPoint LocalEndPoint { get => _inner.LocalEndPoint; set => _inner.LocalEndPoint = value; }
            public override EndPoint RemoteEndPoint { get => _inner.RemoteEndPoint; set => _inner.RemoteEndPoint = value; }
 
            public override void Abort(ConnectionAbortedException abortReason) => _inner.Abort(abortReason);
            public override void Abort() => _inner.Abort();
 
            public override ValueTask DisposeAsync() => new ValueTask(_reconnectingConnectionFactory.DisposeAsync(_inner));
        }
    }
}