File: System\ServiceModel\Channels\BufferedConnection.cs
Web Access
Project: src\src\System.ServiceModel.NetFramingBase\src\System.ServiceModel.NetFramingBase.csproj (System.ServiceModel.NetFramingBase)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.ServiceModel.Channels
{
    internal class BufferedConnection : DelegatingConnection
    {
        private Memory<byte> _writeBuffer;
        private int _writeBufferSize;
        private int _pendingWriteSize;
        private Exception _pendingWriteException;
        private Timer _flushTimer;
        private long _flushTimeout;
        private TimeSpan _pendingTimeout;
        private const int MaxFlushSkew = 100;
 
        public BufferedConnection(IConnection connection, TimeSpan flushTimeout, int writeBufferSize)
            : base(connection)
        {
            _flushTimeout = Ticks.FromTimeSpan(flushTimeout);
            _writeBufferSize = writeBufferSize;
        }
 
        private SemaphoreSlim ThisLock { get; } = new SemaphoreSlim(1);
 
        private void CancelFlushTimer()
        {
            if (_flushTimer != null)
            {
                _flushTimer.Change(Timeout.Infinite, Timeout.Infinite);
                _pendingTimeout = TimeSpan.Zero;
            }
        }
 
        private async ValueTask FlushAsync(TimeSpan timeout)
        {
            ThrowPendingWriteException();
 
            await ThisLock.WaitAsync();
            try
            {
                await FlushCoreAsync(timeout);
            }
            finally
            {
                ThisLock.Release();
            }
        }
 
        private async ValueTask FlushCoreAsync(TimeSpan timeout)
        {
            if (_pendingWriteSize > 0)
            {
                await Connection.WriteAsync(_writeBuffer.Slice(0, _pendingWriteSize), false, timeout);
                _pendingWriteSize = 0;
            }
        }
 
        private static void OnFlushTimer(object state)
        {
            var bufferedConnection = (BufferedConnection)state;
            bufferedConnection.OnFlushTimerCore();
        }
 
        private async void OnFlushTimerCore()
        {
            await ThisLock.WaitAsync();
            try
            {
                try
                {
                    await FlushCoreAsync(_pendingTimeout);
                    _pendingTimeout = TimeSpan.Zero;
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    _pendingWriteException = e;
                    CancelFlushTimer();
                }
            }
            finally
            {
                ThisLock.Release();
            }
        }
 
        private void SetFlushTimer()
        {
            if (_flushTimer == null)
            {
                _flushTimer = new Timer(new TimerCallback(OnFlushTimer), this, _flushTimeout, Timeout.Infinite);
            }
            else
            {
                _flushTimer.Change(_flushTimeout, Timeout.Infinite);
            }
        }
 
        public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool immediate, TimeSpan timeout)
        {
            if (buffer.IsEmpty)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException(nameof(buffer.Length), buffer.Length, SR.Format(
                    SR.ValueMustBePositive)));
            }
 
            ThrowPendingWriteException();
 
            if (immediate || _flushTimeout == 0)
            {
                return WriteNowAsync(buffer, timeout);
            }
            else
            {
                return WriteLaterAsync(buffer, timeout);
            }
 
        }
 
        private async ValueTask WriteNowAsync(ReadOnlyMemory<byte> buffer, TimeSpan timeout)
        {
            await ThisLock.WaitAsync();
            try
            {
                if (_pendingWriteSize > 0)
                {
                    int remainingSize = _writeBufferSize - _pendingWriteSize;
                    CancelFlushTimer();
                    if (buffer.Length <= remainingSize)
                    {
                        buffer.CopyTo(_writeBuffer.Slice(_pendingWriteSize));
                        _pendingWriteSize += buffer.Length;
                        await FlushCoreAsync(timeout);
                        return;
                    }
                    else
                    {
                        TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                        await FlushCoreAsync(timeoutHelper.RemainingTime());
                        timeout = timeoutHelper.RemainingTime();
                    }
                }
 
                await Connection.WriteAsync(buffer, true, timeout);
            }
            finally
            {
                ThisLock.Release();
            }
        }
 
        private async ValueTask WriteLaterAsync(ReadOnlyMemory<byte> buffer, TimeSpan timeout)
        {
            await ThisLock.WaitAsync();
            try
            {
                bool setTimer = _pendingWriteSize == 0;
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
                while (buffer.Length > 0)
                {
                    if (buffer.Length >= _writeBufferSize && _pendingWriteSize == 0)
                    {
                        await Connection.WriteAsync(buffer, false, timeoutHelper.RemainingTime());
                        buffer = default;
                    }
                    else
                    {
                        if (_writeBuffer.IsEmpty)
                        {
                            _writeBuffer = Fx.AllocateByteArray(_writeBufferSize);
                        }
 
                        int remainingSize = _writeBufferSize - _pendingWriteSize;
                        int copySize = buffer.Length;
                        if (copySize > remainingSize)
                        {
                            copySize = remainingSize;
                        }
 
                        buffer.CopyTo(_writeBuffer.Slice(_pendingWriteSize, copySize));
                        _pendingWriteSize += copySize;
                        if (_pendingWriteSize == _writeBufferSize)
                        {
                            await FlushCoreAsync(timeoutHelper.RemainingTime());
                            setTimer = true;
                        }
                        buffer = buffer.Slice(copySize);
                    }
                }
                if (_pendingWriteSize > 0)
                {
                    if (setTimer)
                    {
                        SetFlushTimer();
                        _pendingTimeout = TimeoutHelper.Add(_pendingTimeout, timeoutHelper.RemainingTime());
                    }
                }
                else
                {
                    CancelFlushTimer();
                }
            }
            finally
            {
                ThisLock.Release();
            }
        }
 
        private void ThrowPendingWriteException()
        {
            if (_pendingWriteException != null)
            {
                lock (ThisLock)
                {
                    if (_pendingWriteException != null)
                    {
                        Exception exceptionTothrow = _pendingWriteException;
                        _pendingWriteException = null;
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(exceptionTothrow);
                    }
                }
            }
        }
    }
 
    internal class BufferedConnectionInitiator : IConnectionInitiator
    {
        private int _writeBufferSize;
        private TimeSpan _flushTimeout;
        private IConnectionInitiator _connectionInitiator;
 
        public BufferedConnectionInitiator(IConnectionInitiator connectionInitiator, TimeSpan flushTimeout, int writeBufferSize)
        {
            _connectionInitiator = connectionInitiator;
            _flushTimeout = flushTimeout;
            _writeBufferSize = writeBufferSize;
        }
 
        protected TimeSpan FlushTimeout => _flushTimeout;
 
        protected int WriteBufferSize => _writeBufferSize;
 
        public async ValueTask<IConnection> ConnectAsync(Uri uri, TimeSpan timeout)
        {
            return new BufferedConnection(await _connectionInitiator.ConnectAsync(uri, timeout), _flushTimeout, _writeBufferSize);
        }
    }
}