File: System\ServiceModel\Dispatcher\BufferedReceiveBinder.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
 
using System.Runtime;
using System.ServiceModel.Channels;
using System.Threading;
 
namespace System.ServiceModel.Dispatcher
{
    internal class BufferedReceiveBinder : IChannelBinder
    {
        private static Action<object> s_tryReceive = new Action<object>(BufferedReceiveBinder.TryReceive);
        private static AsyncCallback s_tryReceiveCallback = Fx.ThunkCallback(new AsyncCallback(TryReceiveCallback));
 
        private IChannelBinder _channelBinder;
        private InputQueue<RequestContextWrapper> _inputQueue;
        private int _pendingOperationSemaphore;
 
        public BufferedReceiveBinder(IChannelBinder channelBinder)
        {
            _channelBinder = channelBinder;
            _inputQueue = new InputQueue<RequestContextWrapper>();
        }
 
        public IChannel Channel
        {
            get { return _channelBinder.Channel; }
        }
 
        public bool HasSession
        {
            get { return _channelBinder.HasSession; }
        }
 
        public Uri ListenUri
        {
            get { return _channelBinder.ListenUri; }
        }
 
        public EndpointAddress LocalAddress
        {
            get { return _channelBinder.LocalAddress; }
        }
 
        public EndpointAddress RemoteAddress
        {
            get { return _channelBinder.RemoteAddress; }
        }
 
        public void Abort()
        {
            _inputQueue.Close();
            _channelBinder.Abort();
        }
 
        public void CloseAfterFault(TimeSpan timeout)
        {
            _inputQueue.Close();
            _channelBinder.CloseAfterFault(timeout);
        }
 
        // Locking:
        // Only 1 channelBinder operation call should be active at any given time. All future calls
        // will wait on the inputQueue. The semaphore is always released right before the Dispatch on the inputQueue.
        // This protects a new call racing with an existing operation that is just about to fully complete.
 
        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        {
            if (Interlocked.CompareExchange(ref _pendingOperationSemaphore, 1, 0) == 0)
            {
                ActionItem.Schedule(s_tryReceive, this);
            }
 
            RequestContextWrapper wrapper;
            bool success = _inputQueue.Dequeue(timeout, out wrapper);
 
            if (success && wrapper != null)
            {
                requestContext = wrapper.RequestContext;
            }
            else
            {
                requestContext = null;
            }
 
            return success;
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (Interlocked.CompareExchange(ref _pendingOperationSemaphore, 1, 0) == 0)
            {
                IAsyncResult result = _channelBinder.BeginTryReceive(timeout, s_tryReceiveCallback, this);
                if (result.CompletedSynchronously)
                {
                    HandleEndTryReceive(result);
                }
            }
 
            return _inputQueue.BeginDequeue(timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        {
            RequestContextWrapper wrapper;
            bool success = _inputQueue.EndDequeue(result, out wrapper);
 
            if (success && wrapper != null)
            {
                requestContext = wrapper.RequestContext;
            }
            else
            {
                requestContext = null;
            }
            return success;
        }
 
        public RequestContext CreateRequestContext(Message message)
        {
            return _channelBinder.CreateRequestContext(message);
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            _channelBinder.Send(message, timeout);
        }
 
        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return _channelBinder.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        {
            _channelBinder.EndSend(result);
        }
 
        public Message Request(Message message, TimeSpan timeout)
        {
            return _channelBinder.Request(message, timeout);
        }
 
        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return _channelBinder.BeginRequest(message, timeout, callback, state);
        }
 
        public Message EndRequest(IAsyncResult result)
        {
            return _channelBinder.EndRequest(result);
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            return _channelBinder.WaitForMessage(timeout);
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return _channelBinder.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        {
            return _channelBinder.EndWaitForMessage(result);
        }
 
        internal void InjectRequest(RequestContext requestContext)
        {
            // Reuse the existing requestContext
            _inputQueue.EnqueueAndDispatch(new RequestContextWrapper(requestContext));
        }
 
        //
        // TryReceive threads
        //
 
        private static void TryReceive(object state)
        {
            BufferedReceiveBinder binder = (BufferedReceiveBinder)state;
 
            RequestContext requestContext;
            bool requiresDispatch = false;
            try
            {
                if (binder._channelBinder.TryReceive(TimeSpan.MaxValue, out requestContext))
                {
                    requiresDispatch = binder._inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
                }
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                requiresDispatch = binder._inputQueue.EnqueueWithoutDispatch(exception, null);
            }
            finally
            {
                Interlocked.Exchange(ref binder._pendingOperationSemaphore, 0);
                if (requiresDispatch)
                {
                    binder._inputQueue.Dispatch();
                }
            }
        }
 
        private static void TryReceiveCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            HandleEndTryReceive(result);
        }
 
        private static void HandleEndTryReceive(IAsyncResult result)
        {
            BufferedReceiveBinder binder = (BufferedReceiveBinder)result.AsyncState;
 
            RequestContext requestContext;
            bool requiresDispatch = false;
            try
            {
                if (binder._channelBinder.EndTryReceive(result, out requestContext))
                {
                    requiresDispatch = binder._inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
                }
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                requiresDispatch = binder._inputQueue.EnqueueWithoutDispatch(exception, null);
            }
            finally
            {
                Interlocked.Exchange(ref binder._pendingOperationSemaphore, 0);
                if (requiresDispatch)
                {
                    binder._inputQueue.Dispatch();
                }
            }
        }
 
        // A RequestContext may be 'null' (some pieces of ChannelHandler depend on this) but the InputQueue
        // will not allow null items to be enqueued. Wrap the RequestContexts in another object to
        // facilitate this semantic
        private class RequestContextWrapper
        {
            public RequestContextWrapper(RequestContext requestContext)
            {
                RequestContext = requestContext;
            }
 
            public RequestContext RequestContext
            {
                get;
                private set;
            }
        }
    }
}