File: System\ServiceModel\Dispatcher\ListenerHandler.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
 
using System.Runtime;
using System.ServiceModel.Channels;
using System.Threading.Tasks;
using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager;
 
namespace System.ServiceModel.Dispatcher
{
    internal class ListenerHandler : CommunicationObject
    {
        private static Action<object> s_initiateChannelPump = new Action<object>(ListenerHandler.InitiateChannelPump);
        private SessionIdleManager _idleManager;
        private bool _acceptedNull;
        private bool _doneAccepting;
        private readonly IListenerBinder _listenerBinder;
        private IDefaultCommunicationTimeouts _timeouts;
 
        internal ListenerHandler(IListenerBinder listenerBinder, ChannelDispatcher channelDispatcher, IDefaultCommunicationTimeouts timeouts)
        {
            _listenerBinder = listenerBinder;
            if (!(_listenerBinder != null))
            {
                Fx.Assert("ListenerHandler.ctor: (this.listenerBinder != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull(nameof(listenerBinder));
            }
 
            ChannelDispatcher = channelDispatcher;
            if (!(ChannelDispatcher != null))
            {
                Fx.Assert("ListenerHandler.ctor: (this.channelDispatcher != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull(nameof(channelDispatcher));
            }
 
            _timeouts = timeouts;
            Endpoints = channelDispatcher.EndpointDispatcherTable;
        }
 
        internal ChannelDispatcher ChannelDispatcher { get; }
 
        internal ListenerChannel Channel { get; private set; }
 
        protected override TimeSpan DefaultCloseTimeout
        {
            get { return ServiceDefaults.CloseTimeout; }
        }
 
        protected override TimeSpan DefaultOpenTimeout
        {
            get { return ServiceDefaults.OpenTimeout; }
        }
 
        internal EndpointDispatcherTable Endpoints { get; set; }
 
        new internal object ThisLock
        {
            get { return base.ThisLock; }
        }
 
        protected internal override async Task OnCloseAsync(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            // if there's an idle manager that has not been cancelled, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            ChannelDispatcher.Channels.CloseInput();
 
            // Start closing existing channels
            await CloseChannelsAsync(timeoutHelper.RemainingTime());
        }
 
        protected internal override Task OnOpenAsync(TimeSpan timeout)
        {
            OnOpen(timeout);
            return TaskHelpers.CompletedTask();
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpened()
        {
            base.OnOpened();
            ChannelDispatcher.Channels.IncrementActivityCount();
            NewChannelPump();
        }
 
        internal void NewChannelPump()
        {
            ActionItem.Schedule(ListenerHandler.s_initiateChannelPump, this);
        }
 
        private static void InitiateChannelPump(object state)
        {
            ListenerHandler listenerHandler = state as ListenerHandler;
            listenerHandler.ChannelPump();
        }
 
        private void ChannelPump()
        {
            IChannelListener listener = _listenerBinder.Listener;
 
            for (; ; )
            {
                if (_acceptedNull || (listener.State == CommunicationState.Faulted))
                {
                    DoneAccepting();
                    break;
                }
 
                Dispatch();
            }
        }
 
        private void AbortChannels()
        {
            IChannel[] channels = ChannelDispatcher.Channels.ToArray();
            for (int index = 0; index < channels.Length; index++)
            {
                channels[index].Abort();
            }
        }
 
        private async Task CloseChannelAsync(IChannel channel, TimeSpan timeout)
        {
            try
            {
                if (channel.State != CommunicationState.Closing && channel.State != CommunicationState.Closed)
                {
                    CloseChannelState state = new CloseChannelState(this, channel);
                    if (channel is ISessionChannel<IDuplexSession>)
                    {
                        IDuplexSession duplexSession = ((ISessionChannel<IDuplexSession>)channel).Session;
                        await Task.Factory.FromAsync(duplexSession.BeginCloseOutputSession, duplexSession.EndCloseOutputSession, timeout, null);
                    }
                    else
                    {
                        await channel.CloseHelperAsync(timeout);
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                HandleError(e);
 
                if (channel is ISessionChannel<IDuplexSession>)
                {
                    channel.Abort();
                }
            }
        }
 
        public void CloseInput(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            // Close all datagram channels
            IChannel[] channels = ChannelDispatcher.Channels.ToArray();
            for (int index = 0; index < channels.Length; index++)
            {
                IChannel channel = channels[index];
                if (!IsSessionChannel(channel))
                {
                    try
                    {
                        channel.Close(timeoutHelper.RemainingTime());
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
                        HandleError(e);
                    }
                }
            }
        }
 
        private async Task CloseChannelsAsync(TimeSpan timeout)
        {
            // Call CloseChannel on each channel without waiting for the result so that
            // all the channels are closed concurrently. Then wait for all the close Tasks
            // to complete. This replaces the NetFx implementation which called close
            // asynchronously using BeginClose and the callback reduced the busy count on
            // LifetimeManager and we waited for a waiter to signal when all the channels
            // had closed.
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            IChannel[] channels = ChannelDispatcher.Channels.ToArray();
            Task[] closeTasks = new Task[channels.Length];
            for (int index = 0; index < channels.Length; index++)
            {
                closeTasks[index] = CloseChannelAsync(channels[index], timeoutHelper.RemainingTime());
            }
            await Task.WhenAll(closeTasks);
        }
 
        private void Dispatch()
        {
            ListenerChannel channel = Channel;
            SessionIdleManager idleManager = _idleManager;
            Channel = null;
            _idleManager = null;
 
            try
            {
                if (channel != null)
                {
                    ChannelHandler handler = new ChannelHandler(_listenerBinder.MessageVersion, channel.Binder, this, idleManager);
 
                    if (!channel.Binder.HasSession)
                    {
                        ChannelDispatcher.Channels.Add(channel.Binder.Channel);
                    }
 
                    if (channel.Binder is DuplexChannelBinder)
                    {
                        DuplexChannelBinder duplexChannelBinder = channel.Binder as DuplexChannelBinder;
                        duplexChannelBinder.ChannelHandler = handler;
                        duplexChannelBinder.DefaultCloseTimeout = DefaultCloseTimeout;
 
                        if (_timeouts == null)
                        {
                            duplexChannelBinder.DefaultSendTimeout = ServiceDefaults.SendTimeout;
                        }
                        else
                        {
                            duplexChannelBinder.DefaultSendTimeout = _timeouts.SendTimeout;
                        }
                    }
 
                    ChannelHandler.Register(handler);
                    channel = null;
                    idleManager = null;
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                HandleError(e);
            }
            finally
            {
                if (channel != null)
                {
                    channel.Binder.Channel.Abort();
                    if (idleManager != null)
                    {
                        idleManager.CancelTimer();
                    }
                }
            }
        }
 
        private void AcceptedNull()
        {
            _acceptedNull = true;
        }
 
        private void DoneAccepting()
        {
            lock (ThisLock)
            {
                if (!_doneAccepting)
                {
                    _doneAccepting = true;
                    ChannelDispatcher.Channels.DecrementActivityCount();
                }
            }
        }
 
        private bool IsSessionChannel(IChannel channel)
        {
            return (channel is ISessionChannel<IDuplexSession> ||
                    channel is ISessionChannel<IInputSession> ||
                    channel is ISessionChannel<IOutputSession>);
        }
 
        private void CancelPendingIdleManager()
        {
            SessionIdleManager idleManager = _idleManager;
            if (idleManager != null)
            {
                idleManager.CancelTimer();
            }
        }
 
        protected override void OnAbort()
        {
            // if there's an idle manager that has not been transferred to the channel handler, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            ChannelDispatcher.Channels.CloseInput();
 
            // Abort existing channels
            AbortChannels();
 
            // Wait for channels to finish aborting
            ChannelDispatcher.Channels.Abort();
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return OnCloseAsync(timeout).ToApm(callback, state);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            // if there's an idle manager that has not been cancelled, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            ChannelDispatcher.Channels.CloseInput();
 
            // Start closing existing channels
            // The original implementation would close the channels asynchronously then wait
            // on a busy waiter for them all to close, which meant blocking the thread. This
            // still blocks the thread but does it with less complicated overhead
            CloseChannelsAsync(timeoutHelper.RemainingTime()).WaitForCompletion();
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            result.ToApmEnd();
        }
 
        private bool HandleError(Exception e)
        {
            return ChannelDispatcher.HandleError(e);
        }
 
        internal class CloseChannelState
        {
            private IChannel _channel;
 
            internal CloseChannelState(ListenerHandler listenerHandler, IChannel channel)
            {
                ListenerHandler = listenerHandler;
                _channel = channel;
            }
 
            internal ListenerHandler ListenerHandler { get; }
 
            internal IChannel Channel
            {
                get { return _channel; }
            }
        }
    }
 
    internal class ListenerChannel
    {
        public ListenerChannel(IChannelBinder binder)
        {
            Binder = binder;
        }
 
        public IChannelBinder Binder { get; }
    }
}