File: DiagnosticsServerRouter\DiagnosticsServerRouterFactory.cs
Web Access
Project: src\src\diagnostics\src\Microsoft.Diagnostics.NETCore.Client\Microsoft.Diagnostics.NETCore.Client.csproj (Microsoft.Diagnostics.NETCore.Client)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Microsoft.Diagnostics.NETCore.Client
{
    internal class RuntimeTimeoutException : TimeoutException
    {
        public RuntimeTimeoutException(int TimeoutMs)
            : base(string.Format("No new runtime endpoints connected, waited {0} ms", TimeoutMs))
        { }
    }

    internal class BackendStreamTimeoutException : TimeoutException
    {
        public BackendStreamTimeoutException(int TimeoutMs)
            : base(string.Format("No new back end streams available, waited {0} ms", TimeoutMs))
        { }
    }

    /// <summary>
    /// Base class representing a Diagnostics Server router factory.
    /// </summary>
    internal class DiagnosticsServerRouterFactory
    {
        private int IsStreamConnectedTimeoutMs { get; set; } = 500;

        public virtual string IpcAddress { get; }

        public virtual string TcpAddress { get; }

        public virtual ILogger Logger { get; }

        public virtual Task Start(CancellationToken token)
        {
            throw new NotImplementedException();
        }

        public virtual Task Stop()
        {
            throw new NotImplementedException();
        }

        public virtual void Reset()
        {
            throw new NotImplementedException();
        }

        public virtual Task<Router> CreateRouterAsync(CancellationToken token)
        {
            throw new NotImplementedException();
        }

        protected static bool IsStreamConnected(Stream stream)
        {
            bool connected = true;

            if (stream is NamedPipeServerStream or NamedPipeClientStream)
            {
                PipeStream pipeStream = stream as PipeStream;

                // PeekNamedPipe will return false if the pipe is disconnected/broken.
                connected = NativeMethods.PeekNamedPipe(
                    pipeStream.SafePipeHandle,
                    null,
                    0,
                    IntPtr.Zero,
                    IntPtr.Zero,
                    IntPtr.Zero);
            }
            else if (stream is ExposedSocketNetworkStream networkStream)
            {
                bool blockingState = networkStream.Socket.Blocking;
                try
                {
                    // Check connection read state by peek one byte. Will return 0 in case connection is closed.
                    // A closed connection could also raise exception, but then socket connected state should
                    // be set to false.
                    networkStream.Socket.Blocking = false;
                    if (networkStream.Socket.Receive(new byte[1], 0, 1, System.Net.Sockets.SocketFlags.Peek) == 0)
                    {
                        connected = false;
                    }

                    // Check connection write state by sending non-blocking zero-byte data.
                    // A closed connection should raise exception, but then socket connected state should
                    // be set to false.
                    if (connected)
                    {
                        networkStream.Socket.Send(Array.Empty<byte>(), 0, System.Net.Sockets.SocketFlags.None);
                    }
                }
                catch (Exception)
                {
                    connected = networkStream.Socket.Connected;
                }
                finally
                {
                    networkStream.Socket.Blocking = blockingState;
                }
            }
            else
            {
                connected = stream is WebSocketServer.IWebSocketStreamAdapter adapter ? adapter.IsConnected : false;
            }

            return connected;
        }

        protected async Task IsStreamConnectedAsync(Stream stream, CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                // Check if tcp stream connection is still available.
                if (!DiagnosticsServerRouterFactory.IsStreamConnected(stream))
                {
                    throw new EndOfStreamException();
                }

                try
                {
                    // Wait before rechecking connection.
                    await Task.Delay(IsStreamConnectedTimeoutMs, token).ConfigureAwait(false);
                }
                catch { }
            }
        }

        protected static bool IsCompletedSuccessfully(Task t)
        {
#if NETCOREAPP2_0_OR_GREATER
            return t.IsCompletedSuccessfully;
#else
            return t.IsCompleted && !t.IsCanceled && !t.IsFaulted;
#endif
        }
    }

    /// <summary>
    /// This is a common base class for network-based server endpoints used when building router instances.
    /// </summary>
    /// <remarks>
    /// We have two subclases: for normal TCP/IP sockets, and another for WebSocket connections.
    /// </remarks>
    internal abstract class NetServerRouterFactory : IIpcServerTransportCallbackInternal
    {
        public delegate NetServerRouterFactory CreateInstanceDelegate(string webSocketURL, int runtimeTimeoutMs, ILogger logger);

        private readonly ILogger _logger;
        private IpcEndpointInfo _netServerEndpointInfo;
        public abstract void CreatedNewServer(EndPoint localEP);


        protected ILogger Logger => _logger;

        protected int RuntimeTimeoutMs { get; private set; } = 60000;
        protected int NetServerTimeoutMs { get; set; } = 5000;

        private bool _auto_shutdown;

        protected bool IsAutoShutdown => _auto_shutdown;

        protected IpcEndpointInfo NetServerEndpointInfo
        {
            get => _netServerEndpointInfo;
            private set { _netServerEndpointInfo = value; }
        }


        protected IpcEndpoint Endpoint => NetServerEndpointInfo.Endpoint;
        public Guid RuntimeInstanceId => NetServerEndpointInfo.RuntimeInstanceCookie;
        public int RuntimeProcessId => NetServerEndpointInfo.ProcessId;

        protected void ResetEnpointInfo()
        {
            NetServerEndpointInfo = default(IpcEndpointInfo);
        }

        protected NetServerRouterFactory(int runtimeTimeoutMs, ILogger logger)
        {
            _logger = logger;
            _auto_shutdown = runtimeTimeoutMs != Timeout.Infinite;
            if (runtimeTimeoutMs != Timeout.Infinite)
            {
                RuntimeTimeoutMs = runtimeTimeoutMs;
            }

            _netServerEndpointInfo = default(IpcEndpointInfo);

        }

        /// <summary>
        /// Subclasses should return a human and machine readable address of the server.
        /// For TCP this should be something that can be passed as an address in DOTNET_DiagnosticPorts, for WebSocket it could be a URI.
        /// </summary>
        public abstract string ServerAddress { get; }
        /// <summary>
        /// Subclasses should return a human readable description of the server connection type ("tcp", "WebSocket", etc)
        /// </summary>
        public abstract string ServerTransportName { get; }

        protected abstract Task<IpcEndpointInfo> AcceptAsyncImpl(CancellationToken token);

        public abstract void Start();
        public abstract Task Stop();
        public abstract void Reset();

        public async Task<Stream> AcceptNetStreamAsync(CancellationToken token)
        {
            Stream netServerStream;

            Logger?.LogDebug($"Waiting for a new {ServerTransportName} connection at endpoint \"{ServerAddress}\".");

            if (Endpoint == null)
            {
                using CancellationTokenSource acceptTimeoutTokenSource = new();
                using CancellationTokenSource acceptTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, acceptTimeoutTokenSource.Token);

                try
                {
                    // If no new runtime instance connects, timeout.
                    acceptTimeoutTokenSource.CancelAfter(RuntimeTimeoutMs);
                    NetServerEndpointInfo = await AcceptAsyncImpl(acceptTokenSource.Token).ConfigureAwait(false);
                }
                catch (OperationCanceledException)
                {
                    if (acceptTimeoutTokenSource.IsCancellationRequested)
                    {
                        Logger?.LogDebug("No runtime instance connected before timeout.");

                        if (IsAutoShutdown)
                        {
                            throw new RuntimeTimeoutException(RuntimeTimeoutMs);
                        }
                    }

                    throw;
                }
            }

            using CancellationTokenSource connectTimeoutTokenSource = new();
            using CancellationTokenSource connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);

            try
            {
                // Get next connected tcp stream. Should timeout if no endpoint appears within timeout.
                // If that happens we need to remove endpoint since it might indicate a unresponsive runtime.
                connectTimeoutTokenSource.CancelAfter(NetServerTimeoutMs);
                netServerStream = await Endpoint.ConnectAsync(connectTokenSource.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                if (connectTimeoutTokenSource.IsCancellationRequested)
                {
                    Logger?.LogDebug($"No {ServerTransportName} stream connected before timeout.");
                    throw new BackendStreamTimeoutException(NetServerTimeoutMs);
                }

                throw;
            }

            if (netServerStream != null)
            {
                Logger?.LogDebug($"Successfully connected {ServerTransportName} stream, runtime id={RuntimeInstanceId}, runtime pid={RuntimeProcessId}.");
            }

            return netServerStream;
        }
    }

    /// <summary>
    /// This class represent a TCP/IP server endpoint used when building up router instances.
    /// </summary>
    internal class TcpServerRouterFactory : NetServerRouterFactory
    {
        private string _tcpServerAddress;
        private ReversedDiagnosticsServer _tcpServer;

        public string TcpServerAddress
        {
            get { return _tcpServerAddress; }
        }

        public static TcpServerRouterFactory CreateDefaultInstance(string tcpServer, int runtimeTimeoutMs, ILogger logger)
        {
            return new TcpServerRouterFactory(tcpServer, runtimeTimeoutMs, logger);
        }

        public TcpServerRouterFactory(string tcpServer, int runtimeTimeoutMs, ILogger logger) : base(runtimeTimeoutMs, logger)
        {
            _tcpServerAddress = IpcTcpSocketEndPoint.NormalizeTcpIpEndPoint(string.IsNullOrEmpty(tcpServer) ? "127.0.0.1:0" : tcpServer);

            _tcpServer = new ReversedDiagnosticsServer(_tcpServerAddress, ReversedDiagnosticsServer.Kind.Tcp);
            _tcpServer.TransportCallback = this;
        }

        public override void Start()
        {
            _tcpServer.Start();
        }

        public override async Task Stop()
        {
            await _tcpServer.DisposeAsync().ConfigureAwait(false);
        }

        public override void Reset()
        {
            if (Endpoint != null)
            {
                _tcpServer.RemoveConnection(NetServerEndpointInfo.RuntimeInstanceCookie);
                ResetEnpointInfo();
            }
        }

        protected override Task<IpcEndpointInfo> AcceptAsyncImpl(CancellationToken token) => _tcpServer.AcceptAsync(token);
        public override string ServerAddress => _tcpServerAddress;
        public override string ServerTransportName => "TCP";

        public override void CreatedNewServer(EndPoint localEP)
        {
            if (localEP is IPEndPoint ipEP)
            {
                _tcpServerAddress = _tcpServerAddress.Replace(":0", string.Format(":{0}", ipEP.Port));
            }
        }
    }

    /// <summary>
    /// This class represent a WebSocket server endpoint used when building up router instances.
    /// </summary>
    internal class WebSocketServerRouterFactory : NetServerRouterFactory
    {

        private readonly string _webSocketURL;
        private ReversedDiagnosticsServer _webSocketServer;

        public string WebSocketURL => _webSocketURL;

        public static WebSocketServerRouterFactory CreateDefaultInstance(string webSocketURL, int runtimeTimeoutMs, ILogger logger)
        {
            return new WebSocketServerRouterFactory(webSocketURL, runtimeTimeoutMs, logger);
        }

        public WebSocketServerRouterFactory(string webSocketURL, int runtimeTimeoutMs, ILogger logger) : base(runtimeTimeoutMs, logger)
        {
            Debug.Assert(!string.IsNullOrEmpty(webSocketURL));

            _webSocketURL = webSocketURL;

            _webSocketServer = new ReversedDiagnosticsServer(_webSocketURL, ReversedDiagnosticsServer.Kind.WebSocket, TimeSpan.FromMilliseconds(750));
            _webSocketServer.TransportCallback = this;
        }

        public override void Start()
        {
            _webSocketServer.Start();
        }

        public override async Task Stop()
        {
            await _webSocketServer.DisposeAsync().ConfigureAwait(false);
        }

        public override void Reset()
        {
            if (Endpoint != null)
            {
                _webSocketServer.RemoveConnection(NetServerEndpointInfo.RuntimeInstanceCookie);
                ResetEnpointInfo();
            }
        }

        protected override Task<IpcEndpointInfo> AcceptAsyncImpl(CancellationToken token) => _webSocketServer.AcceptAsync(token);
        public override string ServerAddress => WebSocketURL;
        public override string ServerTransportName => "WebSocket";

        public override void CreatedNewServer(EndPoint localEP)
        {
        }

    }

    /// <summary>
    /// This class represent a TCP/IP client endpoint used when building up router instances.
    /// </summary>
    internal class TcpClientRouterFactory
    {
        protected readonly ILogger _logger;

        protected readonly string _tcpClientAddress;

        protected bool _auto_shutdown;

        protected int TcpClientTimeoutMs { get; set; } = Timeout.Infinite;

        protected int TcpClientRetryTimeoutMs { get; set; } = 500;

        protected ILogger Logger => _logger;

        public delegate TcpClientRouterFactory CreateInstanceDelegate(string tcpClient, int runtimeTimeoutMs, ILogger logger);

        public static TcpClientRouterFactory CreateDefaultInstance(string tcpClient, int runtimeTimeoutMs, ILogger logger)
        {
            return new TcpClientRouterFactory(tcpClient, runtimeTimeoutMs, logger);
        }

        public string TcpClientAddress
        {
            get { return _tcpClientAddress; }
        }

        public TcpClientRouterFactory(string tcpClient, int runtimeTimeoutMs, ILogger logger)
        {
            _logger = logger;
            _tcpClientAddress = IpcTcpSocketEndPoint.NormalizeTcpIpEndPoint(string.IsNullOrEmpty(tcpClient) ? "127.0.0.1:" + string.Format("{0}", 56000 + (Process.GetCurrentProcess().Id % 1000)) : tcpClient);
            _auto_shutdown = runtimeTimeoutMs != Timeout.Infinite;
            if (runtimeTimeoutMs != Timeout.Infinite)
            {
                TcpClientTimeoutMs = runtimeTimeoutMs;
            }
        }

        public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token)
        {
            return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false);
        }

        public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token, bool retry)
        {
            return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false);
        }

        public virtual void Start()
        {
        }

        public virtual void Stop()
        {
        }

        private async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
        {
            _logger?.LogDebug($"Connecting new tcp endpoint \"{_tcpClientAddress}\".");

            IpcTcpSocketEndPoint clientTcpEndPoint = new(_tcpClientAddress);
            using CancellationTokenSource connectTimeoutTokenSource = new();
            using CancellationTokenSource connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);

            connectTimeoutTokenSource.CancelAfter(TcpClientTimeoutMs);

            Socket clientSocket;
            do
            {
                clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);

                try
                {
                    await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, connectTokenSource.Token).ConfigureAwait(false);
                    retry = false;
                }
                catch (Exception)
                {
                    clientSocket?.Dispose();

                    if (connectTimeoutTokenSource.IsCancellationRequested)
                    {
                        _logger?.LogDebug("No tcp stream connected, timing out.");

                        if (_auto_shutdown)
                        {
                            throw new RuntimeTimeoutException(TcpClientTimeoutMs);
                        }

                        throw new TimeoutException();
                    }

                    // If we are not doing retries when runtime is unavailable, fail right away, this will
                    // break any accepted IPC connections, making sure client is notified and could reconnect.
                    // If not, retry until succeed or time out.
                    if (!retry)
                    {
                        _logger?.LogTrace($"Failed connecting {_tcpClientAddress}.");
                        throw;
                    }

                    _logger?.LogTrace($"Failed connecting {_tcpClientAddress}, wait {TcpClientRetryTimeoutMs} ms before retrying.");

                    // If we get an error (without hitting timeout above), most likely due to unavailable listener.
                    // Delay execution to prevent to rapid retry attempts.
                    await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false);
                }
            }
            while (retry);

            Stream tcpClientStream = new ExposedSocketNetworkStream(clientSocket, ownsSocket: true);
            _logger?.LogDebug("Successfully connected tcp stream.");

            return tcpClientStream;
        }

        private async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
        {
            using (token.Register(() => clientSocket.Close(0)))
            {
                try
                {
                    Func<AsyncCallback, object, IAsyncResult> beginConnect = (callback, state) => {
                        return clientSocket.BeginConnect(remoteEP, callback, state);
                    };
                    await Task.Factory.FromAsync(beginConnect, clientSocket.EndConnect, this).ConfigureAwait(false);
                }
                // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
                // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
                catch (ObjectDisposedException) when (token.IsCancellationRequested)
                {
                    // First check if the cancellation token caused the closing of the socket,
                    // then rethrow the exception if it did not.
                    token.ThrowIfCancellationRequested();
                }
            }
        }
    }

    /// <summary>
    /// This class represent a IPC server endpoint used when building up router instances.
    /// </summary>
    internal class IpcServerRouterFactory
    {
        private readonly ILogger _logger;
        private readonly string _ipcServerPath;
        private IpcServerTransport _ipcServer;

        private int IpcServerTimeoutMs { get; set; } = Timeout.Infinite;

        public string IpcServerPath
        {
            get { return _ipcServerPath; }
        }

        public IpcServerRouterFactory(string ipcServer, ILogger logger)
        {
            if (string.IsNullOrEmpty(ipcServer))
            {
                throw new ArgumentException("Missing IPC server path.");
            }

            _logger = logger;
            _ipcServerPath = ipcServer;

            _ipcServer = IpcServerTransport.Create(_ipcServerPath, IpcServerTransport.MaxAllowedConnections, ReversedDiagnosticsServer.Kind.Ipc);
        }

        public static void Start()
        {
        }

        public void Stop()
        {
            _ipcServer?.Dispose();
        }

        public async Task<Stream> AcceptIpcStreamAsync(CancellationToken token)
        {
            Stream ipcServerStream = null;

            _logger?.LogDebug($"Waiting for new ipc connection at endpoint \"{_ipcServerPath}\".");


            using CancellationTokenSource connectTimeoutTokenSource = new();
            using CancellationTokenSource connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);

            try
            {
                connectTimeoutTokenSource.CancelAfter(IpcServerTimeoutMs);
                ipcServerStream = await _ipcServer.AcceptAsync(connectTokenSource.Token).ConfigureAwait(false);
            }
            catch (Exception)
            {
                ipcServerStream?.Dispose();

                if (connectTimeoutTokenSource.IsCancellationRequested)
                {
                    _logger?.LogDebug("No ipc stream connected, timing out.");
                    throw new TimeoutException();
                }

                throw;
            }

            if (ipcServerStream != null)
            {
                _logger?.LogDebug("Successfully connected ipc stream.");
            }

            return ipcServerStream;
        }
    }

    /// <summary>
    /// This class represent a IPC client endpoint used when building up router instances.
    /// </summary>
    internal class IpcClientRouterFactory
    {
        private readonly ILogger _logger;
        private readonly string _ipcClientPath;

        private int IpcClientTimeoutMs { get; set; } = Timeout.Infinite;

        private int IpcClientRetryTimeoutMs { get; set; } = 500;

        public string IpcClientPath
        {
            get { return _ipcClientPath; }
        }

        public IpcClientRouterFactory(string ipcClient, ILogger logger)
        {
            _logger = logger;
            _ipcClientPath = ipcClient;
        }

        public async Task<Stream> ConnectIpcStreamAsync(CancellationToken token)
        {
            Stream ipcClientStream = null;

            _logger?.LogDebug($"Connecting new ipc endpoint \"{_ipcClientPath}\".");

            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
            {
                NamedPipeClientStream namedPipe = new(
                    ".",
                    _ipcClientPath,
                    PipeDirection.InOut,
                    PipeOptions.Asynchronous,
                    TokenImpersonationLevel.Impersonation);

                try
                {
                    await namedPipe.ConnectAsync(IpcClientTimeoutMs, token).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    namedPipe?.Dispose();

                    if (ex is TimeoutException)
                    {
                        _logger?.LogDebug("No ipc stream connected, timing out.");
                    }

                    throw;
                }

                ipcClientStream = namedPipe;
            }
            else
            {
                IpcUnixDomainSocket unixDomainSocket;

                using CancellationTokenSource connectTimeoutTokenSource = new();
                using CancellationTokenSource connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);

                connectTimeoutTokenSource.CancelAfter(IpcClientTimeoutMs);

                bool retry;
                do
                {
                    unixDomainSocket = new IpcUnixDomainSocket();

                    try
                    {
                        await unixDomainSocket.ConnectAsync(new IpcUnixDomainSocketEndPoint(_ipcClientPath), token).ConfigureAwait(false);
                        retry = false;
                    }
                    catch (Exception)
                    {
                        unixDomainSocket?.Dispose();

                        if (connectTimeoutTokenSource.IsCancellationRequested)
                        {
                            _logger?.LogDebug("No ipc stream connected, timing out.");
                            throw new TimeoutException();
                        }

                        _logger?.LogTrace($"Failed connecting {_ipcClientPath}, wait {IpcClientRetryTimeoutMs} ms before retrying.");

                        // If we get an error (without hitting timeout above), most likely due to unavailable listener.
                        // Delay execution to prevent to rapid retry attempts.
                        await Task.Delay(IpcClientRetryTimeoutMs, token).ConfigureAwait(false);

                        retry = true;
                    }
                }
                while (retry);

                ipcClientStream = new ExposedSocketNetworkStream(unixDomainSocket, ownsSocket: true);
            }

            if (ipcClientStream != null)
            {
                _logger?.LogDebug("Successfully connected ipc stream.");
            }

            return ipcClientStream;
        }
    }

    /// <summary>
    /// This class creates IPC Server - TCP Server router instances.
    /// Supports NamedPipes/UnixDomainSocket server and TCP/IP server.
    /// </summary>
    internal class IpcServerTcpServerRouterFactory : DiagnosticsServerRouterFactory
    {
        private ILogger _logger;
        private NetServerRouterFactory _netServerRouterFactory;
        private IpcServerRouterFactory _ipcServerRouterFactory;

        public IpcServerTcpServerRouterFactory(string ipcServer, string tcpServer, int runtimeTimeoutMs, NetServerRouterFactory.CreateInstanceDelegate factory, ILogger logger)
        {
            _logger = logger;
            _netServerRouterFactory = factory(tcpServer, runtimeTimeoutMs, logger);
            _ipcServerRouterFactory = new IpcServerRouterFactory(ipcServer, logger);
        }

        public override string IpcAddress
        {
            get
            {
                return _ipcServerRouterFactory.IpcServerPath;
            }
        }

        public override string TcpAddress
        {
            get
            {
                return _netServerRouterFactory.ServerAddress;
            }
        }

        public override ILogger Logger
        {
            get
            {
                return _logger;
            }
        }

        public override Task Start(CancellationToken token)
        {
            _netServerRouterFactory.Start();
            IpcServerRouterFactory.Start();

            _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> {_netServerRouterFactory.ServerTransportName} server ({_netServerRouterFactory.ServerAddress}) router.");

            return Task.CompletedTask;
        }

        public override Task Stop()
        {
            _logger?.LogInformation($"Stopping IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> {_netServerRouterFactory.ServerTransportName} server ({_netServerRouterFactory.ServerAddress}) router.");
            _ipcServerRouterFactory.Stop();
            return _netServerRouterFactory.Stop();
        }

        public override void Reset()
        {
            _netServerRouterFactory.Reset();
        }

        public override async Task<Router> CreateRouterAsync(CancellationToken token)
        {
            Stream tcpServerStream = null;
            Stream ipcServerStream = null;

            _logger?.LogDebug($"Trying to create new router instance.");

            try
            {
                using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);

                // Get new tcp server endpoint.
                using Task<Stream> netServerStreamTask = _netServerRouterFactory.AcceptNetStreamAsync(cancelRouter.Token);

                // Get new ipc server endpoint.
                using Task<Stream> ipcServerStreamTask = _ipcServerRouterFactory.AcceptIpcStreamAsync(cancelRouter.Token);

                await Task.WhenAny(ipcServerStreamTask, netServerStreamTask).ConfigureAwait(false);

                if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcServerStreamTask) && DiagnosticsServerRouterFactory.IsCompletedSuccessfully(netServerStreamTask))
                {
                    ipcServerStream = ipcServerStreamTask.Result;
                    tcpServerStream = netServerStreamTask.Result;
                }
                else if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcServerStreamTask))
                {
                    ipcServerStream = ipcServerStreamTask.Result;

                    // We have a valid ipc stream and a pending tcp accept. Wait for completion
                    // or disconnect of ipc stream.
                    using Task checkIpcStreamTask = IsStreamConnectedAsync(ipcServerStream, cancelRouter.Token);

                    // Wait for at least completion of one task.
                    await Task.WhenAny(netServerStreamTask, checkIpcStreamTask).ConfigureAwait(false);

                    // Cancel out any pending tasks not yet completed.
                    cancelRouter.Cancel();

                    try
                    {
                        await Task.WhenAll(netServerStreamTask, checkIpcStreamTask).ConfigureAwait(false);
                    }
                    catch (Exception)
                    {
                        // Check if we have an accepted tcp stream.
                        if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(netServerStreamTask))
                        {
                            netServerStreamTask.Result?.Dispose();
                        }

                        if (checkIpcStreamTask.IsFaulted)
                        {
                            _logger?.LogInformation($"Broken ipc connection detected, aborting {_netServerRouterFactory.ServerTransportName} connection.");
                            checkIpcStreamTask.GetAwaiter().GetResult();
                        }

                        throw;
                    }

                    tcpServerStream = netServerStreamTask.Result;
                }
                else if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(netServerStreamTask))
                {
                    tcpServerStream = netServerStreamTask.Result;

                    // We have a valid tcp stream and a pending ipc accept. Wait for completion
                    // or disconnect of tcp stream.
                    using Task checkTcpStreamTask = IsStreamConnectedAsync(tcpServerStream, cancelRouter.Token);

                    // Wait for at least completion of one task.
                    await Task.WhenAny(ipcServerStreamTask, checkTcpStreamTask).ConfigureAwait(false);

                    // Cancel out any pending tasks not yet completed.
                    cancelRouter.Cancel();

                    try
                    {
                        await Task.WhenAll(ipcServerStreamTask, checkTcpStreamTask).ConfigureAwait(false);
                    }
                    catch (Exception)
                    {
                        // Check if we have an accepted ipc stream.
                        if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcServerStreamTask))
                        {
                            ipcServerStreamTask.Result?.Dispose();
                        }

                        if (checkTcpStreamTask.IsFaulted)
                        {
                            _logger?.LogInformation($"Broken {_netServerRouterFactory.ServerTransportName} connection detected, aborting ipc connection.");
                            checkTcpStreamTask.GetAwaiter().GetResult();
                        }

                        throw;
                    }

                    ipcServerStream = ipcServerStreamTask.Result;
                }
                else
                {
                    // Error case, cancel out. wait and throw exception.
                    cancelRouter.Cancel();
                    try
                    {
                        await Task.WhenAll(ipcServerStreamTask, netServerStreamTask).ConfigureAwait(false);
                    }
                    catch (Exception)
                    {
                        // Check if we have an ipc stream.
                        if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcServerStreamTask))
                        {
                            ipcServerStreamTask.Result?.Dispose();
                        }

                        throw;
                    }
                }
            }
            catch (Exception)
            {
                _logger?.LogDebug("Failed creating new router instance.");

                // Cleanup and rethrow.
                ipcServerStream?.Dispose();
                tcpServerStream?.Dispose();

                throw;
            }

            // Create new router.
            _logger?.LogDebug("New router instance successfully created.");

            return new Router(ipcServerStream, tcpServerStream, _logger);
        }
    }

    /// <summary>
    /// This class creates IPC Server - TCP Client router instances.
    /// Supports NamedPipes/UnixDomainSocket server and TCP/IP client.
    /// </summary>
    internal class IpcServerTcpClientRouterFactory : DiagnosticsServerRouterFactory
    {
        private ILogger _logger;
        private IpcServerRouterFactory _ipcServerRouterFactory;
        private TcpClientRouterFactory _tcpClientRouterFactory;

        public IpcServerTcpClientRouterFactory(string ipcServer, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
        {
            _logger = logger;
            _ipcServerRouterFactory = new IpcServerRouterFactory(ipcServer, logger);
            _tcpClientRouterFactory = factory(tcpClient, runtimeTimeoutMs, logger);
        }

        public override string IpcAddress
        {
            get
            {
                return _ipcServerRouterFactory.IpcServerPath;
            }
        }

        public override string TcpAddress
        {
            get
            {
                return _tcpClientRouterFactory.TcpClientAddress;
            }
        }

        public override ILogger Logger
        {
            get
            {
                return _logger;
            }
        }

        public override Task Start(CancellationToken token)
        {
            IpcServerRouterFactory.Start();
            _tcpClientRouterFactory.Start();
            _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");

            return Task.CompletedTask;
        }

        public override Task Stop()
        {
            _logger?.LogInformation($"Stopping IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
            _tcpClientRouterFactory.Stop();
            _ipcServerRouterFactory.Stop();
            return Task.CompletedTask;
        }

        public override async Task<Router> CreateRouterAsync(CancellationToken token)
        {
            Stream tcpClientStream = null;
            Stream ipcServerStream = null;

            _logger?.LogDebug("Trying to create a new router instance.");

            try
            {
                using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);

                // Get new server endpoint.
                ipcServerStream = await _ipcServerRouterFactory.AcceptIpcStreamAsync(cancelRouter.Token).ConfigureAwait(false);

                // Get new client endpoint.
                using Task<Stream> tcpClientStreamTask = _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token);

                // We have a valid ipc stream and a pending tcp stream. Wait for completion
                // or disconnect of ipc stream.
                using Task checkIpcStreamTask = IsStreamConnectedAsync(ipcServerStream, cancelRouter.Token);

                // Wait for at least completion of one task.
                await Task.WhenAny(tcpClientStreamTask, checkIpcStreamTask).ConfigureAwait(false);

                // Cancel out any pending tasks not yet completed.
                cancelRouter.Cancel();

                try
                {
                    await Task.WhenAll(tcpClientStreamTask, checkIpcStreamTask).ConfigureAwait(false);
                }
                catch (Exception)
                {
                    // Check if we have an accepted tcp stream.
                    if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(tcpClientStreamTask))
                    {
                        tcpClientStreamTask.Result?.Dispose();
                    }

                    if (checkIpcStreamTask.IsFaulted)
                    {
                        _logger?.LogInformation("Broken ipc connection detected, aborting tcp connection.");
                        checkIpcStreamTask.GetAwaiter().GetResult();
                    }

                    throw;
                }

                tcpClientStream = tcpClientStreamTask.Result;
            }
            catch (Exception)
            {
                _logger?.LogDebug("Failed creating new router instance.");

                // Cleanup and rethrow.
                ipcServerStream?.Dispose();
                tcpClientStream?.Dispose();

                throw;
            }

            // Create new router.
            _logger?.LogDebug("New router instance successfully created.");

            return new Router(ipcServerStream, tcpClientStream, _logger);
        }
    }

    /// <summary>
    /// This class creates IPC Client - TCP Server router instances.
    /// Supports NamedPipes/UnixDomainSocket client and TCP/IP server.
    /// </summary>
    internal class IpcClientTcpServerRouterFactory : DiagnosticsServerRouterFactory
    {
        private ILogger _logger;
        private IpcClientRouterFactory _ipcClientRouterFactory;
        private NetServerRouterFactory _tcpServerRouterFactory;

        public IpcClientTcpServerRouterFactory(string ipcClient, string tcpServer, int runtimeTimeoutMs, NetServerRouterFactory.CreateInstanceDelegate factory, ILogger logger)
        {
            _logger = logger;
            _ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger);
            _tcpServerRouterFactory = factory(tcpServer, runtimeTimeoutMs, logger);
        }

        public override string IpcAddress
        {
            get
            {
                return _ipcClientRouterFactory.IpcClientPath;
            }
        }

        public override string TcpAddress
        {
            get
            {
                return _tcpServerRouterFactory.ServerAddress;
            }
        }

        public override ILogger Logger
        {
            get
            {
                return _logger;
            }
        }

        public override Task Start(CancellationToken token)
        {
            if (string.IsNullOrEmpty(_ipcClientRouterFactory.IpcClientPath))
            {
                throw new ArgumentException("No IPC client path specified.");
            }

            _tcpServerRouterFactory.Start();

            _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> {_tcpServerRouterFactory.ServerTransportName} server ({_tcpServerRouterFactory.ServerAddress}) router.");

            return Task.CompletedTask;
        }

        public override Task Stop()
        {
            _logger?.LogInformation($"Stopping IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> {_tcpServerRouterFactory.ServerTransportName} server ({_tcpServerRouterFactory.ServerAddress}) router.");
            return _tcpServerRouterFactory.Stop();
        }

        public override void Reset()
        {
            _tcpServerRouterFactory.Reset();
        }

        public override async Task<Router> CreateRouterAsync(CancellationToken token)
        {
            Stream tcpServerStream = null;
            Stream ipcClientStream = null;

            _logger?.LogDebug("Trying to create a new router instance.");

            try
            {
                using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);

                // Get new server endpoint.
                tcpServerStream = await _tcpServerRouterFactory.AcceptNetStreamAsync(cancelRouter.Token).ConfigureAwait(false);

                // Get new client endpoint.
                using Task<Stream> ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token);

                // We have a valid tcp stream and a pending ipc stream. Wait for completion
                // or disconnect of tcp stream.
                using Task checkTcpStreamTask = IsStreamConnectedAsync(tcpServerStream, cancelRouter.Token);

                // Wait for at least completion of one task.
                await Task.WhenAny(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);

                // Cancel out any pending tasks not yet completed.
                cancelRouter.Cancel();

                try
                {
                    await Task.WhenAll(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);
                }
                catch (Exception)
                {
                    // Check if we have an accepted ipc stream.
                    if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcClientStreamTask))
                    {
                        ipcClientStreamTask.Result?.Dispose();
                    }

                    if (checkTcpStreamTask.IsFaulted)
                    {
                        _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
                        checkTcpStreamTask.GetAwaiter().GetResult();
                    }

                    throw;
                }

                ipcClientStream = ipcClientStreamTask.Result;

                try
                {
                    // TcpServer consumes advertise message, needs to be replayed back to ipc client.
                    await IpcAdvertise.SerializeAsync(ipcClientStream, _tcpServerRouterFactory.RuntimeInstanceId, (ulong)_tcpServerRouterFactory.RuntimeProcessId, token).ConfigureAwait(false);
                }
                catch (Exception)
                {
                    _logger?.LogDebug("Failed sending advertise message.");
                    throw;
                }
            }
            catch (Exception)
            {
                _logger?.LogDebug("Failed creating new router instance.");

                // Cleanup and rethrow.
                tcpServerStream?.Dispose();
                ipcClientStream?.Dispose();

                throw;
            }

            // Create new router.
            _logger?.LogDebug("New router instance successfully created.");

            return new Router(ipcClientStream, tcpServerStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
        }
    }

    /// <summary>
    /// This class creates IPC Client - TCP Client router instances.
    /// Supports NamedPipes/UnixDomainSocket client and TCP/IP client.
    /// </summary>
    internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory
    {
        private bool _updateRuntimeInfo;
        private Guid _runtimeInstanceId;
        private ulong _runtimeProcessId;
        private ILogger _logger;
        private IpcClientRouterFactory _ipcClientRouterFactory;
        private TcpClientRouterFactory _tcpClientRouterFactory;

        public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
        {
            _updateRuntimeInfo = true;
            _runtimeInstanceId = Guid.Empty;
            _runtimeProcessId = 0;
            _logger = logger;
            _ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger);
            _tcpClientRouterFactory = factory(tcpClient, runtimeTimeoutMs, logger);
        }

        public override string IpcAddress
        {
            get
            {
                return _ipcClientRouterFactory.IpcClientPath;
            }
        }

        public override string TcpAddress
        {
            get
            {
                return _tcpClientRouterFactory.TcpClientAddress;
            }
        }

        public override ILogger Logger
        {
            get
            {
                return _logger;
            }
        }

        public override Task Start(CancellationToken token)
        {
            _tcpClientRouterFactory.Start();
            _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
            return Task.CompletedTask;
        }

        public override Task Stop()
        {
            _logger?.LogInformation($"Stopping IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
            _tcpClientRouterFactory.Stop();
            return Task.CompletedTask;
        }

        public override async Task<Router> CreateRouterAsync(CancellationToken token)
        {
            Stream tcpClientStream = null;
            Stream ipcClientStream = null;

            int initFrontendToBackendByteTransfer = 0;
            int initBackendToFrontendByteTransfer = 0;

            await UpdateRuntimeInfo(token).ConfigureAwait(false);

            _logger?.LogDebug("Trying to create a new router instance.");

            try
            {
                using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);

                // Get new tcp client endpoint.
                tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token, true).ConfigureAwait(false);

                // Get new ipc client endpoint.
                using Task<Stream> ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token);

                // We have a valid tcp stream and a pending ipc stream. Wait for completion
                // or disconnect of tcp stream.
                using Task checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelRouter.Token);

                // Wait for at least completion of one task.
                await Task.WhenAny(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);

                // Cancel out any pending tasks not yet completed.
                cancelRouter.Cancel();

                try
                {
                    await Task.WhenAll(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);
                }
                catch (Exception)
                {
                    // Check if we have an accepted ipc stream.
                    if (DiagnosticsServerRouterFactory.IsCompletedSuccessfully(ipcClientStreamTask))
                    {
                        ipcClientStreamTask.Result?.Dispose();
                    }

                    if (checkTcpStreamTask.IsFaulted)
                    {
                        _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
                        checkTcpStreamTask.GetAwaiter().GetResult();
                        _updateRuntimeInfo = true;
                    }

                    throw;
                }

                ipcClientStream = ipcClientStreamTask.Result;

                try
                {
                    await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false);
                    initBackendToFrontendByteTransfer = IpcAdvertise.V1SizeInBytes;
                }
                catch (Exception)
                {
                    _logger?.LogDebug("Failed sending advertise message.");
                    throw;
                }

                // Router needs to emulate backend behavior when running in client-client mode.
                // A new router instance can not be complete until frontend starts to
                // write data to backend or a new router instance will connect against frontend
                // that in turn will disconnects previous accepted but pending connections, triggering
                // frequent connects/disconnects.
                initFrontendToBackendByteTransfer = await InitFrontendReadBackendWrite(ipcClientStream, tcpClientStream, token).ConfigureAwait(false);
            }
            catch (Exception)
            {
                _logger?.LogDebug("Failed creating new router instance.");

                if (tcpClientStream == null || (tcpClientStream != null && ipcClientStream == null))
                {
                    _updateRuntimeInfo = true;
                }

                // Cleanup and rethrow.
                tcpClientStream?.Dispose();
                ipcClientStream?.Dispose();

                throw;
            }

            // Create new router.
            _logger?.LogDebug("New router instance successfully created.");

            return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)initBackendToFrontendByteTransfer, (ulong)initFrontendToBackendByteTransfer);
        }

        private async Task<int> InitFrontendReadBackendWrite(Stream ipcClientStream, Stream tcpClientStream, CancellationToken token)
        {
            using CancellationTokenSource cancelReadConnect = CancellationTokenSource.CreateLinkedTokenSource(token);

            byte[] buffer = new byte[1024];
            using Task<int> readTask = ipcClientStream.ReadAsync(buffer, 0, buffer.Length, cancelReadConnect.Token);

            // Check tcp client connection while waiting on ipc client.
            using Task checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelReadConnect.Token);

            // Wait for completion of at least one task.
            await Task.WhenAny(readTask, checkTcpStreamTask).ConfigureAwait(false);

            // Cancel out any pending tasks not yet completed.
            cancelReadConnect.Cancel();

            try
            {
                await Task.WhenAll(readTask, checkTcpStreamTask).ConfigureAwait(false);
            }
            catch (Exception)
            {
                if (readTask.IsFaulted)
                {
                    _logger?.LogInformation("Broken ipc connection detected.");
                }

                if (checkTcpStreamTask.IsFaulted)
                {
                    _logger?.LogInformation("Broken tcp connection detected.");
                    _updateRuntimeInfo = true;
                }

                throw;
            }

            int bytesRead = readTask.Result;
            if (bytesRead == 0)
            {
                _logger?.LogDebug("ReverseDiagnosticServer disconnected ipc connection.");
                throw new DiagnosticsClientException("ReverseDiagnosticServer disconnect detected.");
            }

            await tcpClientStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);

            return bytesRead;
        }

        private async Task UpdateRuntimeInfo(CancellationToken token)
        {
            if (!_updateRuntimeInfo)
            {
                return;
            }

            try
            {
                _logger?.LogDebug($"Requesting runtime process information.");

                // Get new tcp client endpoint.
                using Stream tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(token, true).ConfigureAwait(false);

                // Request process info.
                IpcMessage message = new(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);

                byte[] buffer = message.Serialize();
                await tcpClientStream.WriteAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);

                IpcMessage response = IpcMessage.Parse(tcpClientStream);
                if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
                {
                    ProcessInfo info = ProcessInfo.ParseV1(response.Payload);

                    _runtimeProcessId = info.ProcessId;
                    _runtimeInstanceId = info.RuntimeInstanceCookie;

                    _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
                }
                else
                {
                    throw new ServerErrorException("Failed to retrieve runtime process info.");
                }
            }
            catch (Exception)
            {
                _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
                _runtimeInstanceId = Guid.NewGuid();
                _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
            }
            _updateRuntimeInfo = false;
        }
    }

    internal class Router : IDisposable
    {
        private readonly ILogger _logger;
        private Stream _frontendStream;
        private Stream _backendStream;
        private Task _backendReadFrontendWriteTask;
        private Task _frontendReadBackendWriteTask;
        private CancellationTokenSource _cancelRouterTokenSource;
        private bool _disposed;
        private ulong _backendToFrontendByteTransfer;
        private ulong _frontendToBackendByteTransfer;
        private static int s_routerInstanceCount;

        public TaskCompletionSource<bool> RouterTaskCompleted { get; }

        public Router(Stream frontendStream, Stream backendStream, ILogger logger, ulong initBackendToFrontendByteTransfer = 0, ulong initFrontendToBackendByteTransfer = 0)
        {
            _logger = logger;

            _frontendStream = frontendStream;
            _backendStream = backendStream;

            _cancelRouterTokenSource = new CancellationTokenSource();

            RouterTaskCompleted = new TaskCompletionSource<bool>();

            _backendToFrontendByteTransfer = initBackendToFrontendByteTransfer;
            _frontendToBackendByteTransfer = initFrontendToBackendByteTransfer;

            Interlocked.Increment(ref s_routerInstanceCount);
        }

        public void Start()
        {
            if (_backendReadFrontendWriteTask != null || _frontendReadBackendWriteTask != null || _disposed)
            {
                throw new InvalidOperationException();
            }

            _backendReadFrontendWriteTask = BackendReadFrontendWrite(_cancelRouterTokenSource.Token);
            _frontendReadBackendWriteTask = FrontendReadBackendWrite(_cancelRouterTokenSource.Token);
        }

        public async void Stop()
        {
#pragma warning disable CA1513 // Use ObjectDisposedException throw helper
            if (_disposed)
            {
                throw new ObjectDisposedException(nameof(Router));
            }
#pragma warning restore CA1513 // Use ObjectDisposedException throw helper

            _cancelRouterTokenSource.Cancel();

            List<Task> runningTasks = new();

            if (_backendReadFrontendWriteTask != null)
            {
                runningTasks.Add(_backendReadFrontendWriteTask);
            }

            if (_frontendReadBackendWriteTask != null)
            {
                runningTasks.Add(_frontendReadBackendWriteTask);
            }

            await Task.WhenAll(runningTasks.ToArray()).ConfigureAwait(false);

            _backendReadFrontendWriteTask?.Dispose();
            _frontendReadBackendWriteTask?.Dispose();

            RouterTaskCompleted?.TrySetResult(true);

            _backendReadFrontendWriteTask = null;
            _frontendReadBackendWriteTask = null;
        }

        public bool IsRunning
        {
            get
            {
                if (_backendReadFrontendWriteTask == null || _frontendReadBackendWriteTask == null || _disposed)
                {
                    return false;
                }

                return !_backendReadFrontendWriteTask.IsCompleted && !_frontendReadBackendWriteTask.IsCompleted;
            }
        }
        public void Dispose()
        {
            if (!_disposed)
            {
                Stop();

                _cancelRouterTokenSource.Dispose();

                _backendStream?.Dispose();
                _frontendStream?.Dispose();

                _disposed = true;

                Interlocked.Decrement(ref s_routerInstanceCount);

                _logger?.LogDebug($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes.");
                _logger?.LogDebug($"Active instances: {s_routerInstanceCount}");
            }
        }

        private async Task BackendReadFrontendWrite(CancellationToken token)
        {
            try
            {
                byte[] buffer = new byte[1024];
                while (!token.IsCancellationRequested)
                {
                    _logger?.LogTrace("Start reading bytes from back end.");

                    int bytesRead = await _backendStream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);

                    _logger?.LogTrace($"Read {bytesRead} bytes from back end.");

                    // Check for end of stream indicating that remote end disconnected.
                    if (bytesRead == 0)
                    {
                        _logger?.LogTrace("Back end disconnected.");
                        break;
                    }

                    _backendToFrontendByteTransfer += (ulong)bytesRead;

                    _logger?.LogTrace($"Start writing {bytesRead} bytes to front end.");

                    await _frontendStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods
                    await _frontendStream.FlushAsync().ConfigureAwait(false);
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods

                    _logger?.LogTrace($"Wrote {bytesRead} bytes to front end.");
                }
            }
            catch (Exception)
            {
                // Completing task will trigger dispose of instance and cleanup.
                // Faliure mainly consists of closed/disposed streams and cancelation requests.
                // Just make sure task gets complete, nothing more needs to be in response to these exceptions.
                _logger?.LogTrace("Failed stream operation. Completing task.");
            }

            RouterTaskCompleted?.TrySetResult(true);
        }

        private async Task FrontendReadBackendWrite(CancellationToken token)
        {
            try
            {
                byte[] buffer = new byte[1024];
                while (!token.IsCancellationRequested)
                {
                    _logger?.LogTrace("Start reading bytes from front end.");

                    int bytesRead = await _frontendStream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);

                    _logger?.LogTrace($"Read {bytesRead} bytes from front end.");

                    // Check for end of stream indicating that remote end disconnected.
                    if (bytesRead == 0)
                    {
                        _logger?.LogTrace("Front end disconnected.");
                        break;
                    }

                    _frontendToBackendByteTransfer += (ulong)bytesRead;

                    _logger?.LogTrace($"Start writing {bytesRead} bytes to back end.");

                    await _backendStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods
                    await _backendStream.FlushAsync().ConfigureAwait(false);
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods

                    _logger?.LogTrace($"Wrote {bytesRead} bytes to back end.");
                }
            }
            catch (Exception)
            {
                // Completing task will trigger dispose of instance and cleanup.
                // Faliure mainly consists of closed/disposed streams and cancelation requests.
                // Just make sure task gets complete, nothing more needs to be in response to these exceptions.
                _logger?.LogTrace("Failed stream operation. Completing task.");
            }

            RouterTaskCompleted?.TrySetResult(true);
        }
    }
}