// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Text.Encodings.Web;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Abstractions;
using Microsoft.AspNetCore.Shared;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using static System.IO.Pipelines.DuplexPipe;
namespace Microsoft.AspNetCore.Http.Connections.Client.Internal;
#pragma warning disable CA2252 // This API requires opting into preview features
internal sealed partial class WebSocketsTransport : ITransport, IStatefulReconnectFeature
#pragma warning restore CA2252 // This API requires opting into preview features
private WebSocket? _webSocket;
private IDuplexPipe? _application;
private WebSocketMessageType _webSocketMessageType;
private readonly ILogger _logger;
private readonly TimeSpan _closeTimeout;
private volatile bool _aborted;
private readonly HttpConnectionOptions _httpConnectionOptions;
private readonly HttpClient? _httpClient;
private CancellationTokenSource _stopCts = default!;
private bool _useStatefulReconnect;
private IDuplexPipe? _transport;
// Used for reconnect (when enabled) to determine if the close was ungraceful or not, reconnect only happens on ungraceful disconnect
// The assumption is that a graceful close was triggered purposefully by either the client or server and a reconnect shouldn't occur
private bool _gracefulClose;
private Func<PipeWriter, Task>? _notifyOnReconnect;
internal Task Running { get; private set; } = Task.CompletedTask;
public PipeReader Input => _transport!.Input;
public PipeWriter Output => _transport!.Output;
#pragma warning disable CA2252 // This API requires opting into preview features
public void OnReconnected(Func<PipeWriter, Task> notifyOnReconnect)
if (_notifyOnReconnect is null)
_notifyOnReconnect = notifyOnReconnect;
var localNotifyOnReconnect = _notifyOnReconnect;
_notifyOnReconnect = async writer =>
await localNotifyOnReconnect(writer).ConfigureAwait(false);
await notifyOnReconnect(writer).ConfigureAwait(false);
#pragma warning restore CA2252 // This API requires opting into preview features
public WebSocketsTransport(HttpConnectionOptions httpConnectionOptions, ILoggerFactory loggerFactory, Func<Task<string?>> accessTokenProvider, HttpClient? httpClient,
bool useStatefulReconnect = false)
_useStatefulReconnect = useStatefulReconnect;
_logger = loggerFactory?.CreateLogger(typeof(WebSocketsTransport)) ?? NullLogger.Instance;
_httpConnectionOptions = httpConnectionOptions ?? new HttpConnectionOptions();
_closeTimeout = _httpConnectionOptions.CloseTimeout;
// We were given an updated delegate from the HttpConnection and we are updating what we have in httpOptions
// options itself is copied object of user's options
_httpConnectionOptions.AccessTokenProvider = accessTokenProvider;
_httpClient = httpClient;
private async ValueTask<WebSocket> DefaultWebSocketFactory(WebSocketConnectionContext context, CancellationToken cancellationToken)
var webSocket = new ClientWebSocket();
var url = context.Uri;
var isBrowser = OperatingSystem.IsBrowser();
if (!isBrowser)
// Full Framework will throw when trying to set the User-Agent header
// So avoid setting it in netstandard2.0 and only set it in netstandard2.1 and higher
webSocket.Options.SetRequestHeader("User-Agent", Constants.UserAgentHeader.ToString());
// Set an alternative user agent header on Full framework
webSocket.Options.SetRequestHeader("X-SignalR-User-Agent", Constants.UserAgentHeader.ToString());
// Set this header so the server auth middleware will set an Unauthorized instead of Redirect status code
// See: https://github.com/aspnet/Security/blob/ff9f145a8e89c9756ea12ff10c6d47f2f7eb345f/src/Microsoft.AspNetCore.Authentication.Cookies/Events/CookieAuthenticationEvents.cs#L42
webSocket.Options.SetRequestHeader("X-Requested-With", "XMLHttpRequest");
if (context.Options != null)
if (context.Options.Headers.Count > 0)
if (isBrowser)
foreach (var header in context.Options.Headers)
webSocket.Options.SetRequestHeader(header.Key, header.Value);
var allowHttp2 = true;
if (!isBrowser)
if (context.Options.Cookies != null)
webSocket.Options.Cookies = context.Options.Cookies;
if (context.Options.ClientCertificates is { Count: > 0 })
if (context.Options.Credentials != null)
webSocket.Options.Credentials = context.Options.Credentials;
// Negotiate Auth isn't supported over HTTP/2 and HttpClient does not gracefully fallback to HTTP/1.1 in that case
// https://github.com/dotnet/runtime/issues/1582
allowHttp2 = false;
var originalProxy = webSocket.Options.Proxy;
if (context.Options.Proxy != null)
webSocket.Options.Proxy = context.Options.Proxy;
if (context.Options.UseDefaultCredentials != null)
webSocket.Options.UseDefaultCredentials = context.Options.UseDefaultCredentials.Value;
if (context.Options.UseDefaultCredentials.Value)
// Negotiate Auth isn't supported over HTTP/2 and HttpClient does not gracefully fallback to HTTP/1.1 in that case
// https://github.com/dotnet/runtime/issues/1582
allowHttp2 = false;
if (webSocket.Options.HttpVersion >= HttpVersion.Version20 && allowHttp2)
// Reset options we set on the users' behalf since they are already on the HttpClient that we're passing to ConnectAsync
// And ConnectAsync will throw if these options are set on the ClientWebSocketOptions
if (ReferenceEquals(webSocket.Options.Cookies, context.Options.Cookies))
webSocket.Options.Cookies = null;
if (IsX509CertificateCollectionEqual(webSocket.Options.ClientCertificates, context.Options.ClientCertificates))
if (ReferenceEquals(webSocket.Options.Credentials, context.Options.Credentials))
webSocket.Options.Credentials = null;
if (webSocket.Options.UseDefaultCredentials == (context.Options.UseDefaultCredentials ?? false))
webSocket.Options.UseDefaultCredentials = false;
if (ReferenceEquals(webSocket.Options.Proxy, context.Options.Proxy))
webSocket.Options.Proxy = originalProxy;
if (!allowHttp2 && webSocket.Options.HttpVersion >= HttpVersion.Version20)
// We shouldn't fallback to HTTP/1.1 if the user explicitly states
if (webSocket.Options.HttpVersionPolicy == HttpVersionPolicy.RequestVersionOrLower)
webSocket.Options.HttpVersion = HttpVersion.Version11;
throw new InvalidOperationException("Negotiate Authentication doesn't work with HTTP/2 or higher.");
static bool IsX509CertificateCollectionEqual(X509CertificateCollection? left, X509CertificateCollection? right)
var leftCount = left?.Count ?? 0;
var rightCount = right?.Count ?? 0;
if (leftCount == rightCount)
for (var i = 0; i < rightCount; ++i)
if (!ReferenceEquals(left![i], right![i]))
return false;
return true;
return false;
if (_httpConnectionOptions.AccessTokenProvider != null
&& webSocket.Options.HttpVersion < HttpVersion.Version20
// Apply access token logic when using HTTP/1.1 because we don't use the AccessTokenHttpMessageHandler via HttpClient unless the user specifies HTTP/2.0 or higher
var accessToken = await _httpConnectionOptions.AccessTokenProvider().ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(accessToken))
// We can't use request headers in the browser, so instead append the token as a query string in that case
if (OperatingSystem.IsBrowser())
var accessTokenEncoded = UrlEncoder.Default.Encode(accessToken);
accessTokenEncoded = "access_token=" + accessTokenEncoded;
url = Utils.AppendQueryString(url, accessTokenEncoded);
webSocket.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
// Only share the HttpClient if the user opts-in to HTTP/2 (or higher)
// This is because there is some non-obvious behavior changes when passing in an invoker to ConnectAsync
// and there isn't really any benefit to sharing the HttpClient in HTTP/1.1
if (webSocket.Options.HttpVersion > HttpVersion.Version11)
await webSocket.ConnectAsync(url, invoker: _httpClient, cancellationToken).ConfigureAwait(false);
await webSocket.ConnectAsync(url, cancellationToken).ConfigureAwait(false);
return webSocket;
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text)
throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat));
_webSocketMessageType = transferFormat == TransferFormat.Binary
? WebSocketMessageType.Binary
: WebSocketMessageType.Text;
var resolvedUrl = ResolveWebSocketsUrl(url);
Log.StartTransport(_logger, transferFormat, resolvedUrl);
var context = new WebSocketConnectionContext(resolvedUrl, _httpConnectionOptions);
var factory = _httpConnectionOptions.WebSocketFactory ?? DefaultWebSocketFactory;
_webSocket = await factory(context, cancellationToken).ConfigureAwait(false);
if (_webSocket == null)
throw new InvalidOperationException("Configured WebSocketFactory did not return a value.");
_stopCts = new CancellationTokenSource();
var isReconnect = false;
if (_transport is null)
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var pair = CreateConnectionPair(_httpConnectionOptions.TransportPipeOptions, _httpConnectionOptions.AppPipeOptions);
_transport = pair.Transport;
_application = pair.Application;
isReconnect = true;
// TODO: Handle TCP connection errors
// https://github.com/SignalR/SignalR/blob/1fba14fa3437e24c204dfaf8a18db3fce8acad3c/src/Microsoft.AspNet.SignalR.Core/Owin/WebSockets/WebSocketHandler.cs#L248-L251
Running = ProcessSocketAsync(_webSocket, url, isReconnect);
private async Task ProcessSocketAsync(WebSocket socket, Uri url, bool isReconnect)
Debug.Assert(_application != null);
using (socket)
// Begin sending and receiving.
var receiving = StartReceiving(socket);
var sending = StartSending(socket, ignoreFirstCanceled: isReconnect);
if (isReconnect)
Debug.Assert(_notifyOnReconnect is not null);
await _notifyOnReconnect.Invoke(_transport!.Output).ConfigureAwait(false);
// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (trigger == receiving)
// We're waiting for the application to finish and there are 2 things it could be doing
// 1. Waiting for application data
// 2. Waiting for a websocket send to complete
// Cancel the application so that ReadAsync yields
var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token)).ConfigureAwait(false);
if (resultTask != sending)
_aborted = true;
// Abort the websocket if we're stuck in a pending send to the client
// Should not throw
await sending.ConfigureAwait(false);
// We're waiting on the websocket to close and there are 2 things it could be doing
// 1. Waiting for websocket data
// 2. Waiting on a flush to complete (backpressure being applied)
_aborted = true;
// Abort the websocket if we're stuck in a pending receive from the client
// Cancel any pending flush so that we can quit
if (_gracefulClose)
// Should not throw
await receiving.ConfigureAwait(false);
var cleanup = true;
if (!_gracefulClose && UpdateConnectionPair())
await StartAsync(url, _webSocketMessageType == WebSocketMessageType.Binary ? TransferFormat.Binary : TransferFormat.Text,
cancellationToken: default).ConfigureAwait(false);
cleanup = false;
catch (Exception ex)
throw new InvalidOperationException("Reconnect attempt failed.", innerException: ex);
if (cleanup)
// Pipes will usually already be completed.
// If stateful reconnect fails we want to make sure the Pipes are cleaned up.
// And in rare cases where the websocket is closing at the same time StopAsync is called
// It's possible a Pipe won't be completed so let's be safe and call Complete again.
private async Task StartReceiving(WebSocket socket)
Debug.Assert(_application != null);
while (true)
// Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
var result = await socket.ReceiveAsync(Memory<byte>.Empty, _stopCts.Token).ConfigureAwait(false);
if (result.MessageType == WebSocketMessageType.Close)
_gracefulClose = true;
Log.WebSocketClosed(_logger, socket.CloseStatus);
if (socket.CloseStatus != WebSocketCloseStatus.NormalClosure)
throw new InvalidOperationException($"Websocket closed with error: {socket.CloseStatus}.");
var memory = _application.Output.GetMemory();
// Because we checked the CloseStatus from the 0 byte read above, we don't need to check again after reading
var receiveResult = await socket.ReceiveAsync(memory, _stopCts.Token).ConfigureAwait(false);
var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment);
// Exceptions are handled above where the send and receive tasks are being run.
var receiveResult = await socket.ReceiveAsync(arraySegment, _stopCts.Token).ConfigureAwait(false);
#error TFMs need to be updated
// Need to check again for netstandard2.1 because a close can happen between a 0-byte read and the actual read
if (receiveResult.MessageType == WebSocketMessageType.Close)
_gracefulClose = true;
Log.WebSocketClosed(_logger, socket.CloseStatus);
if (socket.CloseStatus != WebSocketCloseStatus.NormalClosure)
throw new InvalidOperationException($"Websocket closed with error: {socket.CloseStatus}.");
Log.MessageReceived(_logger, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);
var flushResult = await _application.Output.FlushAsync().ConfigureAwait(false);
// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
catch (OperationCanceledException)
catch (Exception ex)
if (!_aborted)
if (_gracefulClose || !_useStatefulReconnect)
// only logging in this case because the other case gets the exception flowed to application code
Log.ReceiveErrored(_logger, ex);
// We're done writing
if (_gracefulClose || !_useStatefulReconnect)
private async Task StartSending(WebSocket socket, bool ignoreFirstCanceled)
Debug.Assert(_application != null);
Exception? error = null;
while (true)
var result = await _application.Input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
// Get a frame from the application
if (result.IsCanceled && !ignoreFirstCanceled)
ignoreFirstCanceled = false;
if (!buffer.IsEmpty)
Log.ReceivedFromApp(_logger, buffer.Length);
if (WebSocketCanSend(socket))
await socket.SendAsync(buffer, _webSocketMessageType, _stopCts.Token).ConfigureAwait(false);
catch (Exception ex)
if (!_aborted)
Log.ErrorSendingMessage(_logger, ex);
else if (result.IsCompleted)
catch (Exception ex)
error = ex;
if (WebSocketCanSend(socket))
if (!OperatingSystem.IsBrowser())
// We're done sending, send the close frame to the client if the websocket is still open
await socket.CloseOutputAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", _stopCts.Token).ConfigureAwait(false);
// WebSocket in the browser doesn't have an equivalent to CloseOutputAsync, it just calls CloseAsync and logs a warning
// So let's just call CloseAsync to avoid the warning
await socket.CloseAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", _stopCts.Token).ConfigureAwait(false);
catch (Exception ex)
Log.ClosingWebSocketFailed(_logger, ex);
if (_gracefulClose || !_useStatefulReconnect)
if (error is not null)
Log.SendErrored(_logger, error);
private static bool WebSocketCanSend(WebSocket ws)
return !(ws.State == WebSocketState.Aborted ||
ws.State == WebSocketState.Closed ||
ws.State == WebSocketState.CloseSent);
private static Uri ResolveWebSocketsUrl(Uri url)
var uriBuilder = new UriBuilder(url);
if (url.Scheme == "http")
uriBuilder.Scheme = "ws";
else if (url.Scheme == "https")
uriBuilder.Scheme = "wss";
return uriBuilder.Uri;
public async Task StopAsync()
_gracefulClose = true;
if (_application == null)
// We never started
// Cancel any pending reads from the application, this should start the entire shutdown process
// Start ungraceful close timer
await Running.ConfigureAwait(false);
catch (Exception ex)
Log.TransportStopped(_logger, ex);
// exceptions have been handled in the Running task continuation by closing the channel with the exception
Log.TransportStopped(_logger, null);
private bool UpdateConnectionPair()
lock (this)
// Lock and check _useStatefulReconnect, we want to swap the Pipe completely before DisableReconnect returns if there is contention there.
// The calling code will start completing the transport after DisableReconnect
// so we want to avoid any possibility of the new Pipe staying alive or even worse a new WebSocket connection being open when the transport
// might think it's closed.
if (_useStatefulReconnect == false)
return false;
var input = new Pipe(_httpConnectionOptions.TransportPipeOptions);
// Add new pipe for reading from and writing to transport from app code
var transportToApplication = new DuplexPipe(_transport!.Input, input.Writer);
var applicationToTransport = new DuplexPipe(input.Reader, _application!.Output);
_application = applicationToTransport;
_transport = transportToApplication;
return true;
#pragma warning disable CA2252 // This API requires opting into preview features
public void DisableReconnect()
#pragma warning restore CA2252 // This API requires opting into preview features
lock (this)
_useStatefulReconnect = false;