File: System\ServiceModel\Channels\ClientReliableDuplexSessionChannel.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System.Runtime;
using System.Threading.Tasks;
using System.Xml;
 
namespace System.ServiceModel.Channels
{
    internal abstract class ReliableDuplexSessionChannel : DuplexChannel, IDuplexSessionChannel, IAsyncDuplexSessionChannel
    {
        private bool _acknowledgementScheduled = false;
        private IOThreadTimer _acknowledgementTimer;
        private ulong _ackVersion = 1;
        private bool _advertisedZero = false;
        private InterruptibleWaitObject _closeOutputWaitObject;
        private SendWaitReliableRequestor _closeRequestor;
        private DeliveryStrategy<Message> _deliveryStrategy;
        private Guard _guard = new Guard(int.MaxValue);
        private ReliableInputConnection _inputConnection;
        private Exception _maxRetryCountException = null;
        private int _pendingAcknowledgements = 0;
        private SendWaitReliableRequestor _terminateRequestor;
        private static Action<object> s_processMessageStatic = new Action<object>(ProcessMessageStatic);
        protected static Func<object, Task> s_startReceivingAsyncStatic = new Func<object, Task>(StartReceivingAsyncStatic);
 
        protected ReliableDuplexSessionChannel(ChannelManagerBase manager, IReliableFactorySettings settings, IReliableChannelBinder binder)
            : base(manager, binder.LocalAddress)
        {
            Binder = binder;
            Settings = settings;
            _acknowledgementTimer = new IOThreadTimer(new Func<object, Task>(OnAcknowledgementTimeoutElapsedAsync), null, true);
            Binder.Faulted += OnBinderFaulted;
            Binder.OnException += OnBinderException;
        }
 
        public IReliableChannelBinder Binder { get; }
 
        public override EndpointAddress LocalAddress => Binder.LocalAddress;
 
        protected ReliableOutputConnection OutputConnection { get; private set; }
 
        protected UniqueId OutputID => ReliableSession.OutputID;
 
        protected ChannelReliableSession ReliableSession { get; private set; }
 
        public override EndpointAddress RemoteAddress => Binder.RemoteAddress;
 
        protected IReliableFactorySettings Settings { get; }
 
        public override Uri Via => RemoteAddress.Uri;
 
        public IDuplexSession Session => (IDuplexSession)ReliableSession;
 
        IAsyncDuplexSession ISessionChannel<IAsyncDuplexSession>.Session => (IAsyncDuplexSession)ReliableSession;
 
        private void AddPendingAcknowledgements(Message message)
        {
            using(ThisAsyncLock.TakeLock())
            {
                if (_pendingAcknowledgements > 0)
                {
                    _acknowledgementTimer.Cancel();
                    _acknowledgementScheduled = false;
                    _pendingAcknowledgements = 0;
                    _ackVersion++;
 
                    int bufferRemaining = GetBufferRemaining();
 
                    WsrmUtilities.AddAcknowledgementHeader(
                        Settings.ReliableMessagingVersion,
                        message,
                        ReliableSession.InputID,
                        _inputConnection.Ranges,
                        _inputConnection.IsLastKnown,
                        bufferRemaining);
                }
            }
        }
 
        private Task CloseSequenceAsync(TimeSpan timeout)
        {
            CreateCloseRequestor();
            return _closeRequestor.RequestAsync(timeout);
            // reply came from receive loop, receive loop owns verified message so nothing more to do.
        }
 
        private void ConfigureRequestor(ReliableRequestor requestor)
        {
            requestor.MessageVersion = Settings.MessageVersion;
            requestor.Binder = Binder;
            requestor.SetRequestResponsePattern();
        }
 
        private Message CreateAcknowledgmentMessage()
        {
            using(ThisAsyncLock.TakeLock())
                _ackVersion++;
 
            int bufferRemaining = GetBufferRemaining();
 
            Message message = WsrmUtilities.CreateAcknowledgmentMessage(Settings.MessageVersion,
                Settings.ReliableMessagingVersion, ReliableSession.InputID, _inputConnection.Ranges,
                _inputConnection.IsLastKnown, bufferRemaining);
 
            if (WcfEventSource.Instance.SequenceAcknowledgementSentIsEnabled())
            {
                WcfEventSource.Instance.SequenceAcknowledgementSent(ReliableSession.Id);
            }
 
            return message;
        }
 
        private void CreateCloseRequestor()
        {
            SendWaitReliableRequestor temp = new SendWaitReliableRequestor();
 
            ConfigureRequestor(temp);
            temp.TimeoutString1Index = SRP.TimeoutOnClose;
            temp.MessageAction = WsrmIndex.GetCloseSequenceActionHeader(
                Settings.MessageVersion.Addressing);
            temp.MessageBody = new CloseSequence(ReliableSession.OutputID, OutputConnection.Last);
 
            using(ThisAsyncLock.TakeLock())
            {
                ThrowIfClosed();
                _closeRequestor = temp;
            }
        }
 
        private void CreateTerminateRequestor()
        {
            SendWaitReliableRequestor temp = new SendWaitReliableRequestor();
 
            ConfigureRequestor(temp);
            ReliableMessagingVersion reliableMessagingVersion = Settings.ReliableMessagingVersion;
            temp.MessageAction = WsrmIndex.GetTerminateSequenceActionHeader(
                Settings.MessageVersion.Addressing, reliableMessagingVersion);
            temp.MessageBody = new TerminateSequence(reliableMessagingVersion, ReliableSession.OutputID,
                OutputConnection.Last);
 
            using(ThisAsyncLock.TakeLock())
            {
                ThrowIfClosed();
                _terminateRequestor = temp;
 
                if (_inputConnection.IsLastKnown)
                {
                    ReliableSession.CloseSession();
                }
            }
        }
 
        private int GetBufferRemaining()
        {
            int bufferRemaining = -1;
 
            if (Settings.FlowControlEnabled)
            {
                bufferRemaining = Settings.MaxTransferWindowSize - _deliveryStrategy.EnqueuedCount;
                _advertisedZero = (bufferRemaining == 0);
            }
 
            return bufferRemaining;
        }
 
        public override T GetProperty<T>()
        {
            if (typeof(T) == typeof(IDuplexSessionChannel))
            {
                return (T)(object)this;
            }
 
            T baseProperty = base.GetProperty<T>();
            if (baseProperty != null)
            {
                return baseProperty;
            }
 
            T innerProperty = Binder.Channel.GetProperty<T>();
            if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
            {
                return (T)(object)FaultConverter.GetDefaultFaultConverter(Settings.MessageVersion);
            }
            else
            {
                return innerProperty;
            }
        }
 
        private async Task InternalCloseOutputSessionAsync(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            await OutputConnection.CloseAsync(timeoutHelper.RemainingTime());
 
            if (Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                await CloseSequenceAsync(timeoutHelper.RemainingTime());
            }
 
            await TerminateSequenceAsync(timeoutHelper.RemainingTime());
        }
 
        protected virtual void OnRemoteActivity()
        {
            ReliableSession.OnRemoteActivity(false);
        }
 
        private WsrmFault ProcessCloseOrTerminateSequenceResponse(bool close, WsrmMessageInfo info)
        {
            SendWaitReliableRequestor requestor = close ? _closeRequestor : _terminateRequestor;
 
            if (requestor != null)
            {
                WsrmFault fault = close
                    ? WsrmUtilities.ValidateCloseSequenceResponse(ReliableSession, _closeRequestor.MessageId, info,
                    OutputConnection.Last)
                    : WsrmUtilities.ValidateTerminateSequenceResponse(ReliableSession, _terminateRequestor.MessageId,
                    info, OutputConnection.Last);
 
                if (fault != null)
                {
                    return fault;
                }
 
                requestor.SetInfo(info);
                return null;
            }
 
            string request = close ? Wsrm11Strings.CloseSequence : WsrmFeb2005Strings.TerminateSequence;
            string faultString = SRP.Format(SRP.ReceivedResponseBeforeRequestFaultString, request);
            string exceptionString = SRP.Format(SRP.ReceivedResponseBeforeRequestExceptionString, request);
            return SequenceTerminatedFault.CreateProtocolFault(ReliableSession.OutputID, faultString, exceptionString);
        }
 
        protected async Task ProcessDuplexMessageAsync(WsrmMessageInfo info)
        {
            bool closeMessage = true;
 
            try
            {
                bool wsrmFeb2005 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
                bool wsrm11 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
                bool final = false;
 
                if (OutputConnection != null && info.AcknowledgementInfo != null)
                {
                    final = wsrm11 && info.AcknowledgementInfo.Final;
 
                    int bufferRemaining = -1;
 
                    if (Settings.FlowControlEnabled)
                        bufferRemaining = info.AcknowledgementInfo.BufferRemaining;
 
                    OutputConnection.ProcessTransferred(info.AcknowledgementInfo.Ranges, bufferRemaining);
                }
 
                OnRemoteActivity();
 
                bool tryAckNow = (info.AckRequestedInfo != null);
                bool forceAck = false;
                bool terminate = false;
                bool scheduleShutdown = false;
                ulong oldAckVersion = 0;
                WsrmFault fault = null;
                Message message = null;
                Exception remoteFaultException = null;
 
                if (info.SequencedMessageInfo != null)
                {
                    bool needDispatch = false;
 
                    using (ThisAsyncLock.TakeLock())
                    {
                        if (Aborted || State == CommunicationState.Faulted)
                        {
                            return;
                        }
 
                        long sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
                        bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
 
                        if (!_inputConnection.IsValid(sequenceNumber, isLast))
                        {
                            if (wsrmFeb2005)
                            {
                                fault = new LastMessageNumberExceededFault(ReliableSession.InputID);
                            }
                            else
                            {
                                message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
                                    Settings.MessageVersion, Settings.ReliableMessagingVersion);
                                forceAck = true;
 
                                OnMessageDropped();
                            }
                        }
                        else if (_inputConnection.Ranges.Contains(sequenceNumber))
                        {
                            OnMessageDropped();
                            tryAckNow = true;
                        }
                        else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
                        {
                            _inputConnection.Merge(sequenceNumber, isLast);
 
                            if (_inputConnection.AllAdded)
                            {
                                scheduleShutdown = true;
 
                                if (OutputConnection.CheckForTermination())
                                {
                                    ReliableSession.CloseSession();
                                }
                            }
                        }
                        else if (State == CommunicationState.Closing)
                        {
                            if (wsrmFeb2005)
                            {
                                fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                                    SRP.SequenceTerminatedSessionClosedBeforeDone,
                                    SRP.SessionClosedBeforeDone);
                            }
                            else
                            {
                                message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
                                    Settings.MessageVersion, Settings.ReliableMessagingVersion);
                                forceAck = true;
 
                                OnMessageDropped();
                            }
                        }
                        // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
                        // serialized ack size and the amount of memory taken by the ack ranges. In the
                        // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
                        // threat.
                        else if (_deliveryStrategy.CanEnqueue(sequenceNumber)
                            && (Settings.Ordered || _inputConnection.CanMerge(sequenceNumber)))
                        {
                            _inputConnection.Merge(sequenceNumber, isLast);
                            needDispatch = _deliveryStrategy.Enqueue(info.Message, sequenceNumber);
                            closeMessage = false;
                            oldAckVersion = _ackVersion;
                            _pendingAcknowledgements++;
 
                            if (_inputConnection.AllAdded)
                            {
                                scheduleShutdown = true;
 
                                if (OutputConnection.CheckForTermination())
                                {
                                    ReliableSession.CloseSession();
                                }
                            }
                        }
                        else
                        {
                            OnMessageDropped();
                        }
 
                        // if (ack now && we enqueued && an ack has been sent since we enqueued (and thus
                        // carries the sequence number of the message we just processed)) then we don't
                        // need to ack again.
                        if (_inputConnection.IsLastKnown || _pendingAcknowledgements == Settings.MaxTransferWindowSize)
                            tryAckNow = true;
 
                        bool startTimer = tryAckNow || (_pendingAcknowledgements > 0 && fault == null);
                        if (startTimer && !_acknowledgementScheduled)
                        {
                            _acknowledgementScheduled = true;
                            _acknowledgementTimer.Set(Settings.AcknowledgementInterval);
                        }
                    }
 
                    if (needDispatch)
                    {
                        Dispatch();
                    }
                }
                else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
                {
                    bool isTerminateEarly;
 
                    using (ThisAsyncLock.TakeLock())
                    {
                        isTerminateEarly = !_inputConnection.Terminate();
                    }
 
                    if (isTerminateEarly)
                    {
                        fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                            SRP.SequenceTerminatedEarlyTerminateSequence,
                            SRP.EarlyTerminateSequence);
                    }
                }
                else if (wsrm11)
                {
                    if (((info.TerminateSequenceInfo != null) && (info.TerminateSequenceInfo.Identifier == ReliableSession.InputID))
                        || (info.CloseSequenceInfo != null))
                    {
                        bool isTerminate = info.TerminateSequenceInfo != null;
                        WsrmRequestInfo requestInfo = isTerminate
                            ? (WsrmRequestInfo)info.TerminateSequenceInfo
                            : (WsrmRequestInfo)info.CloseSequenceInfo;
                        long last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
 
                        if (!WsrmUtilities.ValidateWsrmRequest(ReliableSession, requestInfo, Binder, null))
                        {
                            return;
                        }
 
                        bool isLastLargeEnough = true;
                        bool isLastConsistent = true;
 
                        using (ThisAsyncLock.TakeLock())
                        {
                            if (!_inputConnection.IsLastKnown)
                            {
                                if (isTerminate)
                                {
                                    if (_inputConnection.SetTerminateSequenceLast(last, out isLastLargeEnough))
                                    {
                                        scheduleShutdown = true;
                                    }
                                    else if (isLastLargeEnough)
                                    {
                                        remoteFaultException = new ProtocolException(SRP.EarlyTerminateSequence);
                                    }
                                }
                                else
                                {
                                    scheduleShutdown = _inputConnection.SetCloseSequenceLast(last);
                                    isLastLargeEnough = scheduleShutdown;
                                }
 
                                if (scheduleShutdown)
                                {
                                    ReliableSession.SetFinalAck(_inputConnection.Ranges);
                                    if (_terminateRequestor != null)
                                    {
                                        ReliableSession.CloseSession();
                                    }
 
                                    _deliveryStrategy.Dispose();
                                }
                            }
                            else
                            {
                                isLastConsistent = (last == _inputConnection.Last);
 
                                // Have seen CloseSequence already, TerminateSequence means cleanup.
                                if (isTerminate && isLastConsistent && _inputConnection.IsSequenceClosed)
                                {
                                    terminate = true;
                                }
                            }
                        }
 
                        if (!isLastLargeEnough)
                        {
                            string faultString = SRP.SequenceTerminatedSmallLastMsgNumber;
                            string exceptionString = SRP.SmallLastMsgNumberExceptionString;
                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
                        }
                        else if (!isLastConsistent)
                        {
                            string faultString = SRP.SequenceTerminatedInconsistentLastMsgNumber;
                            string exceptionString = SRP.InconsistentLastMsgNumberExceptionString;
                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
                        }
                        else
                        {
                            message = isTerminate
                                ? WsrmUtilities.CreateTerminateResponseMessage(Settings.MessageVersion,
                                requestInfo.MessageId, ReliableSession.InputID)
                                : WsrmUtilities.CreateCloseSequenceResponse(Settings.MessageVersion,
                                requestInfo.MessageId, ReliableSession.InputID);
                            forceAck = true;
                        }
                    }
                    else if (info.TerminateSequenceInfo != null)    // Identifier == OutputID
                    {
                        fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                            SRP.SequenceTerminatedUnsupportedTerminateSequence,
                            SRP.UnsupportedTerminateSequenceExceptionString);
                    }
                    else if (info.TerminateSequenceResponseInfo != null)
                    {
                        fault = ProcessCloseOrTerminateSequenceResponse(false, info);
                    }
                    else if (info.CloseSequenceResponseInfo != null)
                    {
                        fault = ProcessCloseOrTerminateSequenceResponse(true, info);
                    }
                    else if (final)
                    {
                        if (_closeRequestor == null)
                        {
                            string exceptionString = SRP.UnsupportedCloseExceptionString;
                            string faultString = SRP.SequenceTerminatedUnsupportedClose;
 
                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.OutputID, faultString,
                                exceptionString);
                        }
                        else
                        {
                            fault = WsrmUtilities.ValidateFinalAck(ReliableSession, info, OutputConnection.Last);
 
                            if (fault == null)
                            {
                                _closeRequestor.SetInfo(info);
                            }
                        }
                    }
                    else if (info.WsrmHeaderFault != null)
                    {
                        if (!(info.WsrmHeaderFault is UnknownSequenceFault))
                        {
                            throw Fx.AssertAndThrow("Fault must be UnknownSequence fault.");
                        }
 
                        if (_terminateRequestor == null)
                        {
                            throw Fx.AssertAndThrow("In wsrm11, if we start getting UnknownSequence, terminateRequestor cannot be null.");
                        }
 
                        _terminateRequestor.SetInfo(info);
                    }
                }
 
                if (fault != null)
                {
                    ReliableSession.OnLocalFault(fault.CreateException(), fault, null);
                    return;
                }
 
                if (scheduleShutdown)
                {
                    ActionItem.Schedule(ShutdownCallback, null);
                }
 
                if (message != null)
                {
                    if (forceAck)
                    {
                        WsrmUtilities.AddAcknowledgementHeader(Settings.ReliableMessagingVersion, message,
                            ReliableSession.InputID, _inputConnection.Ranges, true, GetBufferRemaining());
                    }
                    else if (tryAckNow)
                    {
                        AddPendingAcknowledgements(message);
                    }
                }
                else if (tryAckNow)
                {
                    using (ThisAsyncLock.TakeLock())
                    {
                        if (oldAckVersion != 0 && oldAckVersion != _ackVersion)
                            return;
 
                        if (_acknowledgementScheduled)
                        {
                            _acknowledgementTimer.Cancel();
                            _acknowledgementScheduled = false;
                        }
                        _pendingAcknowledgements = 0;
                    }
 
                    message = CreateAcknowledgmentMessage();
                }
 
                if (message != null)
                {
                    using (message)
                    {
                        if (_guard.Enter())
                        {
                            try
                            {
                                await Binder.SendAsync(message, DefaultSendTimeout);
                            }
                            finally
                            {
                                _guard.Exit();
                            }
                        }
                    }
                }
 
                if (terminate)
                {
                    using (ThisAsyncLock.TakeLock())
                    {
                        _inputConnection.Terminate();
                    }
                }
 
                if (remoteFaultException != null)
                {
                    ReliableSession.OnRemoteFault(remoteFaultException);
                }
            }
            finally
            {
                if (closeMessage)
                {
                    info.Message.Close();
                }
            }
        }
 
        private static void ProcessMessageStatic(object state)
        {
            (ReliableDuplexSessionChannel channel, WsrmMessageInfo info) = (ValueTuple<ReliableDuplexSessionChannel, WsrmMessageInfo>)state;
            _ = channel.ProcessMessageAsync(info);
        }
 
        protected abstract Task ProcessMessageAsync(WsrmMessageInfo info);
 
        protected override void OnAbort()
        {
            if (OutputConnection != null)
                OutputConnection.Abort(this);
 
            if (_inputConnection != null)
                _inputConnection.Abort(this);
 
            _guard.Abort();
 
            ReliableRequestor tempRequestor = _closeRequestor;
            if (tempRequestor != null)
            {
                tempRequestor.Abort(this);
            }
 
            tempRequestor = _terminateRequestor;
            if (tempRequestor != null)
            {
                tempRequestor.Abort(this);
            }
 
            ReliableSession.Abort();
        }
 
        private async Task OnAcknowledgementTimeoutElapsedAsync(object state)
        {
            await using (await ThisAsyncLock.TakeLockAsync())
            {
                _acknowledgementScheduled = false;
                _pendingAcknowledgements = 0;
 
                if (State == CommunicationState.Closing
                    || State == CommunicationState.Closed
                    || State == CommunicationState.Faulted)
                    return;
            }
 
            if (_guard.Enter())
            {
                try
                {
                    using (Message message = CreateAcknowledgmentMessage())
                    {
                        await Binder.SendAsync(message, DefaultSendTimeout);
                    }
                }
                finally
                {
                    _guard.Exit();
                }
            }
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return OnCloseAsync(timeout).ToApm(callback, state);
        }
 
        private void OnBinderException(IReliableChannelBinder sender, Exception exception)
        {
            if (exception is QuotaExceededException)
            {
                if (State == CommunicationState.Opening ||
                    State == CommunicationState.Opened ||
                    State == CommunicationState.Closing)
                {
                    ReliableSession.OnLocalFault(exception, SequenceTerminatedFault.CreateQuotaExceededFault(ReliableSession.OutputID), null);
                }
            }
            else
            {
                EnqueueAndDispatch(exception, null, false);
            }
        }
 
        private void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
        {
            Binder.Abort();
 
            if (State == CommunicationState.Opening ||
                State == CommunicationState.Opened ||
                State == CommunicationState.Closing)
            {
                exception = new CommunicationException(SRP.EarlySecurityFaulted, exception);
                ReliableSession.OnLocalFault(exception, (Message)null, null);
            }
        }
 
        // CloseOutputSession && Close: CloseOutputSession only closes the ReliableOutputConnection
        // from the Opened state, if it does, it must create the closeOutputWaitObject so that
        // close may properly synchronize. If no closeOutputWaitObject is present, Close may close
        // the ReliableOutputConnection safely since it is in the Closing state.
        protected internal override async Task OnCloseAsync(TimeSpan timeout)
        {
            ThrowIfCloseInvalid();
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            if (OutputConnection != null)
            {
                if (_closeOutputWaitObject != null)
                {
                    await _closeOutputWaitObject.WaitAsync(timeoutHelper.RemainingTime());
                }
                else
                {
                    await InternalCloseOutputSessionAsync(timeoutHelper.RemainingTime());
                }
 
                await _inputConnection.CloseAsync(timeoutHelper.RemainingTime());
            }
 
            await _guard.CloseAsync(timeoutHelper.RemainingTime());
            await ReliableSession.CloseAsync(timeoutHelper.RemainingTime());
            await Binder.CloseAsync(timeoutHelper.RemainingTime(), MaskingMode.Handled);
            await base.OnCloseAsync(timeoutHelper.RemainingTime());
        }
 
 
        // CloseOutputSession && Close: CloseOutputSession only closes the ReliableOutputConnection
        // from the Opened state, if it does, it must create the closeOutputWaitObject so that
        // close may properly synchronize. If no closeOutputWaitObject is present, Close may close
        // the ReliableOutputConnection safely since it is in the Closing state.
        protected override void OnClose(TimeSpan timeout)
        {
            TaskHelpers.WaitForCompletionNoSpin(OnCloseAsync(timeout));
        }
 
        protected async Task OnCloseOutputSessionAsync(TimeSpan timeout)
        {
            using (ThisAsyncLock.TakeLock())
            {
                ThrowIfNotOpened();
                ThrowIfFaulted();
 
                if ((State != CommunicationState.Opened)
                    || (_closeOutputWaitObject != null))
                {
                    return;
                }
 
                _closeOutputWaitObject = new InterruptibleWaitObject(false, true);
            }
 
            bool throwing = true;
 
            try
            {
                await InternalCloseOutputSessionAsync(timeout);
                throwing = false;
            }
            finally
            {
                if (throwing)
                {
                    ReliableSession.OnLocalFault(null, SequenceTerminatedFault.CreateCommunicationFault(ReliableSession.OutputID, SRP.CloseOutputSessionErrorReason, null), null);
                    _closeOutputWaitObject.Fault(this);
                }
                else
                {
                    _closeOutputWaitObject.Set();
                }
            }
        }
 
        protected override void OnClosed()
        {
            base.OnClosed();
 
            Binder.Faulted -= OnBinderFaulted;
            if (_deliveryStrategy != null)
                _deliveryStrategy.Dispose();
        }
 
        protected override void OnClosing()
        {
            base.OnClosing();
            _acknowledgementTimer.Cancel();
        }
 
        private void OnComponentFaulted(Exception faultException, WsrmFault fault)
        {
            ReliableSession.OnLocalFault(faultException, fault, null);
        }
 
        private void OnComponentException(Exception exception)
        {
            ReliableSession.OnUnknownException(exception);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            result.ToApmEnd();
        }
 
        protected override void OnFaulted()
        {
            ReliableSession.OnFaulted();
            UnblockClose();
            base.OnFaulted();
        }
 
        protected override async Task OnSendAsync(Message message, TimeSpan timeout)
        {
            if (!await OutputConnection.AddMessageAsync(message, timeout, null))
                ThrowInvalidAddException();
        }
 
        private async Task OnSendAsyncHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException)
        {
            using (attemptInfo.Message)
            {
                if (attemptInfo.RetryCount > Settings.MaxRetryCount)
                {
                    ReliableSession.OnLocalFault(new CommunicationException(SRP.MaximumRetryCountExceeded, _maxRetryCountException),
                        SequenceTerminatedFault.CreateMaxRetryCountExceededFault(ReliableSession.OutputID), null);
                }
                else
                {
                    ReliableSession.OnLocalActivity();
                    AddPendingAcknowledgements(attemptInfo.Message);
 
                    MaskingMode maskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
 
                    if (attemptInfo.RetryCount < Settings.MaxRetryCount)
                    {
                        maskingMode |= MaskingMode.Handled;
                        await Binder.SendAsync(attemptInfo.Message, timeout, maskingMode);
                    }
                    else
                    {
                        try
                        {
                            await Binder.SendAsync(attemptInfo.Message, timeout, maskingMode);
                        }
                        catch (Exception e)
                        {
                            if (Fx.IsFatal(e))
                                throw;
 
                            if (Binder.IsHandleable(e))
                            {
                                _maxRetryCountException = e;
                            }
                            else
                            {
                                throw;
                            }
                        }
                    }
                }
            }
        }
 
        private async Task OnSendAckRequestedAsyncHandler(TimeSpan timeout)
        {
            ReliableSession.OnLocalActivity();
            using (Message message = WsrmUtilities.CreateAckRequestedMessage(Settings.MessageVersion,
                Settings.ReliableMessagingVersion, ReliableSession.OutputID))
            {
                await Binder.SendAsync(message, timeout, MaskingMode.Handled);
            }
        }
 
        protected virtual void OnMessageDropped()
        {
        }
 
        protected void SetConnections()
        {
            OutputConnection = new ReliableOutputConnection(ReliableSession.OutputID,
            Settings.MaxTransferWindowSize, Settings.MessageVersion,
            Settings.ReliableMessagingVersion, ReliableSession.InitiationTime, true, DefaultSendTimeout);
            OutputConnection.Faulted += OnComponentFaulted;
            OutputConnection.OnException += OnComponentException;
            OutputConnection.SendAsyncHandler = OnSendAsyncHandler;
            OutputConnection.SendAckRequestedAsyncHandler = OnSendAckRequestedAsyncHandler;
 
            _inputConnection = new ReliableInputConnection();
            _inputConnection.ReliableMessagingVersion = Settings.ReliableMessagingVersion;
 
            if (Settings.Ordered)
                _deliveryStrategy = new OrderedDeliveryStrategy<Message>(this, Settings.MaxTransferWindowSize, false);
            else
                _deliveryStrategy = new UnorderedDeliveryStrategy<Message>(this, Settings.MaxTransferWindowSize);
 
            _deliveryStrategy.DequeueCallback = OnDeliveryStrategyItemDequeued;
        }
 
        protected void SetSession(ChannelReliableSession session)
        {
            session.UnblockChannelCloseCallback = UnblockClose;
            ReliableSession = session;
        }
 
        private void OnDeliveryStrategyItemDequeued()
        {
            if (_advertisedZero)
                _ = OnAcknowledgementTimeoutElapsedAsync(null);
        }
 
        private static Task StartReceivingAsyncStatic(object state)
        {
            var thisPtr = (ReliableDuplexSessionChannel)state;
            return thisPtr.StartReceivingAsync();
        }
 
        protected async Task StartReceivingAsync()
        {
            try
            {
                while (true)
                {
                    (bool success, RequestContext context) = await Binder.TryReceiveAsync(TimeSpan.MaxValue);
                    if (success)
                    {
                        if (context == null)
                        {
                            bool terminated = false;
 
                            using (ThisAsyncLock.TakeLock())
                            {
                                terminated = _inputConnection.Terminate();
                            }
 
                            if (!terminated && (Binder.State == CommunicationState.Opened))
                            {
                                Exception e = new CommunicationException(SRP.EarlySecurityClose);
                                ReliableSession.OnLocalFault(e, (Message)null, null);
                            }
 
                            break; // End receive loop
                        }
 
                        Message message = context.RequestMessage;
                        context.Close();
 
                        WsrmMessageInfo info = WsrmMessageInfo.Get(Settings.MessageVersion,
                            Settings.ReliableMessagingVersion, Binder.Channel, Binder.GetInnerSession(),
                            message);
 
                        ActionItem.Schedule(s_processMessageStatic, (this, info));
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                    throw;
 
                ReliableSession.OnUnknownException(e);
            }
        }
 
        private void ShutdownCallback(object state)
        {
            Shutdown();
        }
 
        private async Task TerminateSequenceAsync(TimeSpan timeout)
        {
            ReliableMessagingVersion reliableMessagingVersion = Settings.ReliableMessagingVersion;
 
            if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                if (OutputConnection.CheckForTermination())
                {
                    ReliableSession.CloseSession();
                }
 
                Message message = WsrmUtilities.CreateTerminateMessage(Settings.MessageVersion,
                    reliableMessagingVersion, ReliableSession.OutputID);
                await Binder.SendAsync(message, timeout, MaskingMode.Handled);
            }
            else if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                CreateTerminateRequestor();
                await _terminateRequestor.RequestAsync(timeout);
                // reply came from receive loop, receive loop owns verified message so nothing more to do.
            }
            else
            {
                throw Fx.AssertAndThrow("Reliable messaging version not supported.");
            }
        }
 
        private void ThrowIfCloseInvalid()
        {
            bool shouldFault = false;
 
            if (Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                if (_deliveryStrategy.EnqueuedCount > 0 || _inputConnection.Ranges.Count > 1)
                {
                    shouldFault = true;
                }
            }
            else if (Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                if (_deliveryStrategy.EnqueuedCount > 0)
                {
                    shouldFault = true;
                }
            }
 
            if (shouldFault)
            {
                WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                    SRP.SequenceTerminatedSessionClosedBeforeDone, SRP.SessionClosedBeforeDone);
                ReliableSession.OnLocalFault(null, fault, null);
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
            }
        }
 
        private void ThrowInvalidAddException()
        {
            if (State == CommunicationState.Opened)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SRP.SendCannotBeCalledAfterCloseOutputSession));
            else if (State == CommunicationState.Faulted)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(GetTerminalException());
            else
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(CreateClosedException());
        }
 
        private void UnblockClose()
        {
            if (OutputConnection != null)
            {
                OutputConnection.Fault(this);
            }
 
            if (_inputConnection != null)
            {
                _inputConnection.Fault(this);
            }
 
            ReliableRequestor tempRequestor = _closeRequestor;
            if (tempRequestor != null)
            {
                tempRequestor.Fault(this);
            }
 
            tempRequestor = _terminateRequestor;
            if (tempRequestor != null)
            {
                tempRequestor.Fault(this);
            }
        }
    }
 
    internal class ClientReliableDuplexSessionChannel : ReliableDuplexSessionChannel
    {
        private ChannelParameterCollection _channelParameters;
        private DuplexClientReliableSession _clientSession;
        private TimeoutHelper _closeTimeoutHelper;
        private bool _closing;
        private static Func<object, Task> s_onReconnectTimerElapsed = new Func<object, Task>(OnReconnectTimerElapsed);
 
        public ClientReliableDuplexSessionChannel(ChannelManagerBase factory, IReliableFactorySettings settings,
            IReliableChannelBinder binder, FaultHelper faultHelper,
            LateBoundChannelParameterCollection channelParameters, UniqueId inputID)
            : base(factory, settings, binder)
        {
            _clientSession = new DuplexClientReliableSession(this, settings, faultHelper, inputID);
            _clientSession.PollingCallback = PollingAsyncCallback;
            SetSession(_clientSession);
 
            _channelParameters = channelParameters;
            channelParameters.SetChannel(this);
            ((IClientReliableChannelBinder)binder).ConnectionLost += OnConnectionLost;
        }
 
        public override T GetProperty<T>()
        {
            if (typeof(T) == typeof(ChannelParameterCollection))
            {
                return (T)(object)_channelParameters;
            }
 
            return base.GetProperty<T>();
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return OnCloseAsync(timeout).ToApm(callback, state);
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return OnOpenAsync(timeout).ToApm(callback, state);
        }
 
        protected internal override Task OnCloseAsync(TimeSpan timeout)
        {
            _closeTimeoutHelper = new TimeoutHelper(timeout);
            _closing = true;
            return base.OnCloseAsync(timeout);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TaskHelpers.WaitForCompletion(OnCloseAsync(timeout));
        }
 
        private async void OnConnectionLost(object sender, EventArgs args)
        {
            await using (await ThisAsyncLock.TakeLockAsync())
            {
                if ((State == CommunicationState.Opened || State == CommunicationState.Closing) &&
                    !Binder.Connected && _clientSession.StopPolling())
                {
 
                    if (WcfEventSource.Instance.ClientReliableSessionReconnectIsEnabled())
                    {
                        WcfEventSource.Instance.ClientReliableSessionReconnect(_clientSession.Id);
                    }
 
                    await ReconnectAsync();
                }
            }
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            result.ToApmEnd();
        }
 
        protected internal override async Task OnOpenAsync(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            bool throwing = true;
 
            try
            {
                await Binder.OpenAsync(timeoutHelper.RemainingTime());
                await ReliableSession.OpenAsync(timeoutHelper.RemainingTime());
                throwing = false;
            }
            finally
            {
                if (throwing)
                {
                    await Binder.CloseAsync(timeoutHelper.RemainingTime());
                }
            }
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            TaskHelpers.WaitForCompletion(OnOpenAsync(timeout));
        }
 
        protected override void OnOpened()
        {
            base.OnOpened();
            SetConnections();
            ActionItem.Schedule(s_startReceivingAsyncStatic, this);
        }
 
        private static async Task OnReconnectTimerElapsed(object state)
        {
            ClientReliableDuplexSessionChannel channel = (ClientReliableDuplexSessionChannel)state;
            await using (await channel.ThisAsyncLock.TakeLockAsync())
            {
                if ((channel.State == CommunicationState.Opened || channel.State == CommunicationState.Closing) &&
                    !channel.Binder.Connected)
                {
                    await channel.ReconnectAsync();
                }
                else
                {
                    channel._clientSession.ResumePolling(channel.OutputConnection.Strategy.QuotaRemaining == 0);
                }
            }
        }
 
        protected override void OnRemoteActivity()
        {
            ReliableSession.OnRemoteActivity(OutputConnection.Strategy.QuotaRemaining == 0);
        }
 
        private async Task PollingAsyncCallback()
        {
            using (Message message = WsrmUtilities.CreateAckRequestedMessage(Settings.MessageVersion,
                Settings.ReliableMessagingVersion, ReliableSession.OutputID))
            {
                await Binder.SendAsync(message, DefaultSendTimeout);
            }
        }
 
        protected override Task ProcessMessageAsync(WsrmMessageInfo info)
        {
            if (!ReliableSession.ProcessInfo(info, null))
                return Task.CompletedTask;
 
            if (!ReliableSession.VerifyDuplexProtocolElements(info, null))
                return Task.CompletedTask;
 
            return ProcessDuplexMessageAsync(info);
        }
 
        private async Task ReconnectAsync()
        {
            bool handleException = true;
 
            try
            {
                Message message = WsrmUtilities.CreateAckRequestedMessage(Settings.MessageVersion,
                    Settings.ReliableMessagingVersion, ReliableSession.OutputID);
                TimeSpan timeout = _closing ? _closeTimeoutHelper.RemainingTime() : DefaultCloseTimeout;
                await Binder.SendAsync(message, timeout);
                handleException = false;
 
                using (ThisAsyncLock.TakeLock())
                {
                    if (Binder.Connected)
                        _clientSession.ResumePolling(OutputConnection.Strategy.QuotaRemaining == 0);
                    else
                        WaitForReconnect();
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                    throw;
 
                if (handleException)
                    WaitForReconnect();
                else
                    throw;
            }
        }
 
        // If anything throws out of this method, we'll consider it fatal.
        private void WaitForReconnect()
        {
            // It might be worth considering replacing this with Task.Delay and awaiting the task
            // but leaving for now to minimize code churn in porting.
            TimeSpan timeout;
 
            if (_closing)
                timeout = TimeoutHelper.Divide(_closeTimeoutHelper.RemainingTime(), 2);
            else
                timeout = TimeoutHelper.Divide(DefaultSendTimeout, 2);
 
            IOThreadTimer timer = new IOThreadTimer(s_onReconnectTimerElapsed, this, false);
            timer.Set(timeout);
        }
 
        private class DuplexClientReliableSession : ClientReliableSession, IDuplexSession
        {
            private ClientReliableDuplexSessionChannel channel;
 
            public DuplexClientReliableSession(ClientReliableDuplexSessionChannel channel,
                IReliableFactorySettings settings, FaultHelper helper, UniqueId inputID)
                : base(channel, settings, (IClientReliableChannelBinder)channel.Binder, helper, inputID)
            {
                this.channel = channel;
            }
 
            public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state)
            {
                return BeginCloseOutputSession(channel.DefaultCloseTimeout, callback, state);
            }
 
            public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return channel.OnCloseOutputSessionAsync(timeout).ToApm(callback, state);
            }
 
            public void EndCloseOutputSession(IAsyncResult result)
            {
                result.ToApmEnd();
            }
 
            public void CloseOutputSession()
            {
                CloseOutputSession(channel.DefaultCloseTimeout);
            }
 
            public void CloseOutputSession(TimeSpan timeout)
            {
                TaskHelpers.WaitForCompletionNoSpin(channel.OnCloseOutputSessionAsync(timeout));
            }
        }
    }
}