File: System\ServiceModel\Channels\ReliableOutputConnection.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
 
namespace System.ServiceModel.Channels
{
    internal delegate Task SendAsyncHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException);
    internal delegate void ComponentFaultedHandler(Exception faultException, WsrmFault fault);
    internal delegate void ComponentExceptionHandler(Exception exception);
    internal delegate Task RetryHandler(MessageAttemptInfo attemptInfo);
 
    internal sealed class ReliableOutputConnection
    {
        private static Func<object, Task> s_sendRetries = new Func<object, Task>(SendRetries);
 
        private UniqueId _id;
        private ReliableMessagingVersion _reliableMessagingVersion;
        private Guard _sendGuard = new Guard(int.MaxValue);
        private SendAsyncHandler _sendAsyncHandler;
        private OperationWithTimeoutAsyncCallback _sendAckRequestedAsyncHandler;
        private TimeSpan _sendTimeout;
        private InterruptibleWaitObject _shutdownHandle = new InterruptibleWaitObject(false);
        private bool _terminated = false;
 
        public ReliableOutputConnection(UniqueId id,
            int maxTransferWindowSize,
            MessageVersion messageVersion,
            ReliableMessagingVersion reliableMessagingVersion,
            TimeSpan initialRtt,
            bool requestAcks,
            TimeSpan sendTimeout)
        {
            _id = id;
            MessageVersion = messageVersion;
            _reliableMessagingVersion = reliableMessagingVersion;
            _sendTimeout = sendTimeout;
            Strategy = new TransmissionStrategy(reliableMessagingVersion, initialRtt, maxTransferWindowSize, requestAcks, id);
            Strategy.RetryTimeoutElapsed = OnRetryTimeoutElapsed;
            Strategy.OnException = RaiseOnException;
        }
 
        public ComponentFaultedHandler Faulted;
        public ComponentExceptionHandler OnException;
 
        private MessageVersion MessageVersion { get; }
        public bool Closed { get; private set; } = false;
        public long Last => Strategy.Last;
        public SendAsyncHandler SendAsyncHandler { set => _sendAsyncHandler = value; }
        public OperationWithTimeoutAsyncCallback SendAckRequestedAsyncHandler { set => _sendAckRequestedAsyncHandler = value; }
        public TransmissionStrategy Strategy { get; }
        private object ThisLock { get; } = new object();
 
        public void Abort(ChannelBase channel)
        {
            _sendGuard.Abort();
            _shutdownHandle.Abort(channel);
            Strategy.Abort(channel);
        }
 
        private Task CompleteTransferAsync(TimeSpan timeout)
        {
            if (_reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                Message message = Message.CreateMessage(MessageVersion, WsrmFeb2005Strings.LastMessageAction);
                message.Properties.AllowOutputBatching = false;
 
                // Return value ignored.
                return InternalAddMessageAsync(message, timeout, null, true);
            }
            else if (_reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                if (Strategy.SetLast())
                {
                    _shutdownHandle.Set();
                }
                else
                {
                    return _sendAckRequestedAsyncHandler(timeout);
                }
            }
            else
            {
                throw Fx.AssertAndThrow("Unsupported version.");
            }
 
            return Task.CompletedTask;
        }
 
        public Task<bool> AddMessageAsync(Message message, TimeSpan timeout, object state)
        {
            return InternalAddMessageAsync(message, timeout, state, false);
        }
 
        public bool CheckForTermination()
        {
            return Strategy.DoneTransmitting;
        }
 
        public async Task CloseAsync(TimeSpan timeout)
        {
            bool completeTransfer = false;
 
            lock (ThisLock)
            {
                completeTransfer = !Closed;
                Closed = true;
            }
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            if (completeTransfer)
            {
                await CompleteTransferAsync(timeoutHelper.RemainingTime());
            }
 
            await _shutdownHandle.WaitAsync(timeoutHelper.RemainingTime());
            await _sendGuard.CloseAsync(timeoutHelper.RemainingTime());
            Strategy.Close();
        }
 
        public void Fault(ChannelBase channel)
        {
            _sendGuard.Abort();
            _shutdownHandle.Fault(channel);
            Strategy.Fault(channel);
        }
 
        private async Task<bool> InternalAddMessageAsync(Message message, TimeSpan timeout, object state, bool isLast)
        {
            MessageAttemptInfo attemptInfo;
            TimeoutHelper helper = new TimeoutHelper(timeout);
 
            try
            {
                if (isLast)
                {
                    if (state != null)
                    {
                        throw Fx.AssertAndThrow("The isLast overload does not take a state.");
                    }
 
                    attemptInfo = await Strategy.AddLastAsync(message, helper.RemainingTime(), null);
                }
                else
                {
                    bool success;
                    (attemptInfo, success) = await Strategy.AddAsync(message, helper.RemainingTime(), state);
                    if (!success)
                    {
                        return false;
                    }
                }
            }
            catch (TimeoutException)
            {
                if (isLast)
                    RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(_id, SRP.SequenceTerminatedAddLastToWindowTimedOut, null));
                // else - RM does not fault the channel based on a timeout exception trying to add a sequenced message to the window.
 
                throw;
            }
            catch (Exception e)
            {
                if (!Fx.IsFatal(e))
                    RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(_id, SRP.SequenceTerminatedUnknownAddToWindowError, null));
 
                throw;
            }
 
            if (_sendGuard.Enter())
            {
                try
                {
                    await _sendAsyncHandler(attemptInfo, helper.RemainingTime(), false);
                }
                catch (QuotaExceededException)
                {
                    RaiseFault(null, SequenceTerminatedFault.CreateQuotaExceededFault(_id));
                    throw;
                }
                finally
                {
                    _sendGuard.Exit();
                }
            }
 
            return true;
        }
 
        public bool IsFinalAckConsistent(SequenceRangeCollection ranges)
        {
            return Strategy.IsFinalAckConsistent(ranges);
        }
 
        private async Task OnRetryTimeoutElapsed(MessageAttemptInfo attemptInfo)
        {
            if (_sendGuard.Enter())
            {
                try
                {
                    await _sendAsyncHandler(attemptInfo, _sendTimeout, true);
                }
                finally
                {
                    _sendGuard.Exit();
                }
            }
        }
 
        private void OnTransferComplete()
        {
            Strategy.DequeuePending();
 
            if (Strategy.DoneTransmitting)
                Terminate();
        }
 
        public void ProcessTransferred(long transferred, SequenceRangeCollection ranges, int quotaRemaining)
        {
            if (transferred < 0)
            {
                throw Fx.AssertAndThrow("Argument transferred must be a valid sequence number or 0 for protocol messages.");
            }
 
            bool invalidAck;
 
            // ignored, TransmissionStrategy is being used to keep track of what must be re-sent.
            // In the Request-Reply case this state may not align with acks.
            bool inconsistentAck;
 
            Strategy.ProcessAcknowledgement(ranges, out invalidAck, out inconsistentAck);
            invalidAck = (invalidAck || ((transferred != 0) && !ranges.Contains(transferred)));
 
            if (!invalidAck)
            {
                if ((transferred > 0) && Strategy.ProcessTransferred(transferred, quotaRemaining))
                {
                    ActionItem.Schedule(s_sendRetries, this);
                }
                else
                {
                    OnTransferComplete();
                }
            }
            else
            {
                WsrmFault fault = new InvalidAcknowledgementFault(_id, ranges);
                RaiseFault(fault.CreateException(), fault);
            }
        }
 
        public void ProcessTransferred(SequenceRangeCollection ranges, int quotaRemaining)
        {
            bool invalidAck;
            bool inconsistentAck;
 
            Strategy.ProcessAcknowledgement(ranges, out invalidAck, out inconsistentAck);
 
            if (!invalidAck && !inconsistentAck)
            {
                if (Strategy.ProcessTransferred(ranges, quotaRemaining))
                {
                    ActionItem.Schedule(s_sendRetries, this);
                }
                else
                {
                    OnTransferComplete();
                }
            }
            else
            {
                WsrmFault fault = new InvalidAcknowledgementFault(_id, ranges);
                RaiseFault(fault.CreateException(), fault);
            }
        }
 
        private void RaiseFault(Exception faultException, WsrmFault fault)
        {
            ComponentFaultedHandler handler = Faulted;
 
            if (handler != null)
                handler(faultException, fault);
        }
 
        private void RaiseOnException(Exception exception)
        {
            OnException?.Invoke(exception);
        }
 
        private async Task SendRetries()
        {
            try
            {
                while (true)
                {
                    if (_sendGuard.Enter())
                    {
                        try
                        {
                            MessageAttemptInfo attemptInfo = Strategy.GetMessageInfoForRetry(false);
                            if (attemptInfo.Message == null)
                            {
                                break;
                            }
 
                            await _sendAsyncHandler(attemptInfo, _sendTimeout, true);
                        }
                        finally
                        {
                            _sendGuard.Exit();
                        }
 
                        Strategy.DequeuePending();
                        OnTransferComplete();
                    }
                    else
                    {
                        return;
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                    throw;
 
                RaiseOnException(e);
            }
        }
 
        private static Task SendRetries(object state)
        {
            ReliableOutputConnection outputConnection = (ReliableOutputConnection)state;
            return outputConnection.SendRetries();
        }
 
        public void Terminate()
        {
            lock (ThisLock)
            {
                if (_terminated)
                    return;
 
                _terminated = true;
            }
 
            _shutdownHandle.Set();
        }
    }
}