File: FrameworkFork\System.ServiceModel\System\ServiceModel\Channels\BufferedWriteStream.cs
Web Access
Project: src\src\dotnet-svcutil\lib\src\dotnet-svcutil-lib.csproj (dotnet-svcutil-lib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.ServiceModel.Channels
{
    /// <summary>
    /// This class is based on BufferedStream from the Desktop version of .Net. Only the write functionality
    /// is needed by WCF so the read capability has been removed. This allowed some extra logic to be removed
    /// from the write code path. Also some validation code has been removed as this class is no longer
    /// general purpose and is only used in pre-known scenarios and only called by WCF code. Some validation
    /// checks have been converted to only run on a debug build to allow catching code bugs in other WCF code, 
    /// but not causing release build overhead.
    /// 
    /// One of the design goals here is to prevent the buffer from getting in the way and slowing
    /// down underlying stream accesses when it is not needed.
    /// See a large comment in Write for the details of the write buffer heuristic.
    /// 
    /// This class will never cache more bytes than the max specified buffer size.
    /// However, it may use a temporary buffer of up to twice the size in order to combine several IO operations on
    /// the underlying stream into a single operation. This is because we assume that memory copies are significantly
    /// faster than IO operations on the underlying stream (if this was not true, using buffering is never appropriate).
    /// The max size of this "shadow" buffer is limited as to not allocate it on the LOH.
    /// Shadowing is always transient. Even when using this technique, this class still guarantees that the number of
    /// bytes cached (not yet written to the target stream or not yet consumed by the user) is never larger than the 
    /// actual specified buffer size.
    /// </summary>
    internal sealed class BufferedWriteStream : Stream
    {
        public const int DefaultBufferSize = 4096;
        private Stream _stream; // Underlying stream.  Close sets _stream to null.
        private byte[] _buffer; // Wwrite buffer.
        private readonly int _bufferSize; // Length of internal buffer (not counting the shadow buffer).
        private int _writePos; // Write pointer within buffer.
        private readonly SemaphoreSlim _sem = new SemaphoreSlim(1, 1);
 
        public BufferedWriteStream(Stream stream) : this(stream, DefaultBufferSize) { }
 
        public BufferedWriteStream(Stream stream, int bufferSize)
        {
            Contract.Assert(stream != Null, "stream!=Null");
            Contract.Assert(bufferSize > 0, "bufferSize>0");
            Contract.Assert(stream.CanWrite);
            _stream = stream;
            _bufferSize = bufferSize;
 
            EnsureBufferAllocated();
        }
 
        private void EnsureNotClosed()
        {
            if (_stream == null)
                throw new ObjectDisposedException("BufferedWriteStream");
        }
 
        private void EnsureCanWrite()
        {
            Contract.Requires(_stream != null);
            if (!_stream.CanWrite)
                throw new NotSupportedException("write");
        }
 
        /// <summary><code>MaxShadowBufferSize</code> is chosen such that shadow buffers are not allocated on the Large Object Heap.
        /// Currently, an object is allocated on the LOH if it is larger than 85000 bytes.
        /// We will go with exactly 80 KBytes, although this is somewhat arbitrary.</summary>
        private const int MaxShadowBufferSize = 81920; // Make sure not to get to the Large Object Heap.
 
        private void EnsureShadowBufferAllocated()
        {
            Contract.Assert(_buffer != null);
            Contract.Assert(_bufferSize > 0);
 
            // Already have shadow buffer?
            if (_buffer.Length != _bufferSize || _bufferSize >= MaxShadowBufferSize)
            {
                return;
            }
 
            byte[] shadowBuffer = new byte[Math.Min(_bufferSize + _bufferSize, MaxShadowBufferSize)];
            Array.Copy(_buffer, 0, shadowBuffer, 0, _writePos);
            _buffer = shadowBuffer;
        }
 
        private void EnsureBufferAllocated()
        {
            if (_buffer == null)
                _buffer = new byte[_bufferSize];
        }
 
        public override bool CanRead
        {
            get { return false; }
        }
 
        public override bool CanWrite
        {
            get { return _stream != null && _stream.CanWrite; }
        }
 
        public override bool CanSeek
        {
            get { return false; }
        }
 
        public override long Length
        {
            get { throw new NotSupportedException("Position"); }
        }
 
        public override long Position
        {
            get { throw new NotSupportedException("Position"); }
            set { throw new NotSupportedException("Position"); }
        }
 
        protected override void Dispose(bool disposing)
        {
            try
            {
                if (disposing && _stream != null)
                {
                    try
                    {
                        Flush();
                    }
                    finally
                    {
                        _stream.Dispose();
                    }
                }
            }
            finally
            {
                _stream = null;
                _buffer = null;
 
                // Call base.Dispose(bool) to cleanup async IO resources
                base.Dispose(disposing);
            }
        }
 
        public override void Flush()
        {
            EnsureNotClosed();
 
            // Has WRITE data in the buffer:
            if (_writePos > 0)
            {
                FlushWrite();
                Contract.Assert(_writePos == 0);
                return;
            }
 
            // We had no data in the buffer, but we still need to tell the underlying stream to flush.
            _stream.Flush();
        }
 
        public override Task FlushAsync(CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                return Task.FromCanceled<int>(cancellationToken);
            }
 
            EnsureNotClosed();
 
            return FlushAsyncInternal(cancellationToken);
        }
 
        private async Task FlushAsyncInternal(CancellationToken cancellationToken)
        {
            await _sem.WaitAsync().ConfigureAwait(false);
            try
            {
                if (_writePos > 0)
                {
                    await FlushWriteAsync(cancellationToken).ConfigureAwait(false);
                    Contract.Assert(_writePos == 0);
                    return;
                }
 
                // We had no data in the buffer, but we still need to tell the underlying stream to flush.
                await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
 
                // There was nothing in the buffer:
                Contract.Assert(_writePos == 0);
            }
            finally
            {
                _sem.Release();
            }
        }
 
        private void FlushWrite()
        {
            Contract.Assert(_buffer != null && _bufferSize >= _writePos,
                "BufferedWriteStream: Write buffer must be allocated and write position must be in the bounds of the buffer in FlushWrite!");
 
            _stream.Write(_buffer, 0, _writePos);
            _writePos = 0;
            _stream.Flush();
        }
 
        private async Task FlushWriteAsync(CancellationToken cancellationToken)
        {
            Contract.Assert(_buffer != null && _bufferSize >= _writePos,
                "BufferedWriteStream: Write buffer must be allocated and write position must be in the bounds of the buffer in FlushWrite!");
 
            await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false);
            _writePos = 0;
            await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
        }
 
        public override int Read([In, Out] byte[] array, int offset, int count)
        {
            throw new NotSupportedException("Read");
        }
 
        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            throw new NotSupportedException("ReadAsync");
        }
 
        public override int ReadByte()
        {
            throw new NotSupportedException("ReadByte");
        }
 
        private void WriteToBuffer(byte[] array, ref int offset, ref int count)
        {
            int bytesToWrite = Math.Min(_bufferSize - _writePos, count);
 
            if (bytesToWrite <= 0)
            {
                return;
            }
 
            Array.Copy(array, offset, _buffer, _writePos, bytesToWrite);
 
            _writePos += bytesToWrite;
            count -= bytesToWrite;
            offset += bytesToWrite;
        }
 
        private void WriteToBuffer(byte[] array, ref int offset, ref int count, out Exception error)
        {
            try
            {
                error = null;
                WriteToBuffer(array, ref offset, ref count);
            }
            catch (Exception ex)
            {
                error = ex;
            }
        }
 
        public override void Write(byte[] array, int offset, int count)
        {
            Contract.Assert(array != null);
            Contract.Assert(offset >= 0);
            Contract.Assert(count >= 0);
            Contract.Assert(count <= array.Length - offset);
 
            EnsureNotClosed();
            EnsureCanWrite();
 
            #region Write algorithm comment
            // We need to use the buffer, while avoiding unnecessary buffer usage / memory copies.
            // We ASSUME that memory copies are much cheaper than writes to the underlying stream, so if an extra copy is
            // guaranteed to reduce the number of writes, we prefer it.
            // We pick a simple strategy that makes degenerate cases rare if our assumptions are right.
            //
            // For every write, we use a simple heuristic (below) to decide whether to use the buffer.
            // The heuristic has the desirable property (*) that if the specified user data can fit into the currently available
            // buffer space without filling it up completely, the heuristic will always tell us to use the buffer. It will also
            // tell us to use the buffer in cases where the current write would fill the buffer, but the remaining data is small
            // enough such that subsequent operations can use the buffer again.
            // 
            // Algorithm:
            // Determine whether or not to buffer according to the heuristic (below).
            // If we decided to use the buffer:
            //     Copy as much user data as we can into the buffer.
            //     If we consumed all data: We are finished.
            //     Otherwise, write the buffer out.
            //     Copy the rest of user data into the now cleared buffer (no need to write out the buffer again as the heuristic
            //     will prevent it from being filled twice).
            // If we decided not to use the buffer:
            //     Can the data already in the buffer and current user data be combines to a single write
            //     by allocating a "shadow" buffer of up to twice the size of _bufferSize (up to a limit to avoid LOH)?
            //     Yes, it can:
            //         Allocate a larger "shadow" buffer and ensure the buffered  data is moved there.
            //         Copy user data to the shadow buffer.
            //         Write shadow buffer to the underlying stream in a single operation.
            //     No, it cannot (amount of data is still too large):
            //         Write out any data possibly in the buffer.
            //         Write out user data directly.
            //
            // Heuristic:
            // If the subsequent write operation that follows the current write operation will result in a write to the
            // underlying stream in case that we use the buffer in the current write, while it would not have if we avoided
            // using the buffer in the current write (by writing current user data to the underlying stream directly), then we
            // prefer to avoid using the buffer since the corresponding memory copy is wasted (it will not reduce the number
            // of writes to the underlying stream, which is what we are optimising for).
            // ASSUME that the next write will be for the same amount of bytes as the current write (most common case) and
            // determine if it will cause a write to the underlying stream. If the next write is actually larger, our heuristic
            // still yields the right behaviour, if the next write is actually smaller, we may making an unnecessary write to
            // the underlying stream. However, this can only occur if the current write is larger than half the buffer size and
            // we will recover after one iteration.
            // We have:
            //     useBuffer = (_writePos + count + count < _bufferSize + _bufferSize)
            //
            // Example with _bufferSize = 20, _writePos = 6, count = 10:
            //
            //     +---------------------------------------+---------------------------------------+
            //     |             current buffer            | next iteration's "future" buffer      |
            //     +---------------------------------------+---------------------------------------+ 
            //     |0| | | | | | | | | |1| | | | | | | | | |2| | | | | | | | | |3| | | | | | | | | |
            //     |0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9|
            //     +-----------+-------------------+-------------------+---------------------------+
            //     | _writePos |  current count    | assumed next count|avail buff after next write|
            //     +-----------+-------------------+-------------------+---------------------------+
            //
            // A nice property (*) of this heuristic is that it will always succeed if the user data completely fits into the
            // available buffer, i.e. if count < (_bufferSize - _writePos).
            #endregion Write algorithm comment
 
            Contract.Assert(_writePos < _bufferSize);
 
            int totalUserBytes;
            bool useBuffer;
            checked
            {
                // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
                totalUserBytes = _writePos + count;
                useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize));
            }
 
            if (useBuffer)
            {
                WriteToBuffer(array, ref offset, ref count);
 
                if (_writePos < _bufferSize)
                {
                    Contract.Assert(count == 0);
                    return;
                }
 
                Contract.Assert(count >= 0);
                Contract.Assert(_writePos == _bufferSize);
                Contract.Assert(_buffer != null);
 
                _stream.Write(_buffer, 0, _writePos);
                _writePos = 0;
 
                WriteToBuffer(array, ref offset, ref count);
 
                Contract.Assert(count == 0);
                Contract.Assert(_writePos < _bufferSize);
            }
            else
            {
                // if (!useBuffer)
                // Write out the buffer if necessary.
                if (_writePos > 0)
                {
                    Contract.Assert(_buffer != null);
                    Contract.Assert(totalUserBytes >= _bufferSize);
 
                    // Try avoiding extra write to underlying stream by combining previously buffered data with current user data:
                    if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize)
                    {
                        EnsureShadowBufferAllocated();
                        Array.Copy(array, offset, _buffer, _writePos, count);
                        _stream.Write(_buffer, 0, totalUserBytes);
                        _writePos = 0;
                        return;
                    }
 
                    _stream.Write(_buffer, 0, _writePos);
                    _writePos = 0;
                }
 
                // Write out user data.
                _stream.Write(array, offset, count);
            }
        }
 
        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            Contract.Assert(buffer != null);
            Contract.Assert(offset >= 0);
            Contract.Assert(count >= 0);
            Contract.Assert(count <= buffer.Length - offset);
            // Fast path check for cancellation already requested
            if (cancellationToken.IsCancellationRequested)
                return Task.FromCanceled<int>(cancellationToken);
 
            EnsureNotClosed();
            EnsureCanWrite();
 
            // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another
            // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this 
            // an Async operation.
            Task semaphoreLockTask = _sem.WaitAsync();
            if (semaphoreLockTask.Status == TaskStatus.RanToCompletion)
            {
                bool completeSynchronously = true;
                try
                {
                    Contract.Assert(_writePos < _bufferSize);
 
                    // If the write completely fits into the buffer, we can complete synchronously:
                    completeSynchronously = (count < _bufferSize - _writePos);
 
                    if (completeSynchronously)
                    {
                        Exception error;
                        WriteToBuffer(buffer, ref offset, ref count, out error);
                        Contract.Assert(count == 0);
 
                        return (error == null)
                                   ? Task.CompletedTask
                                   : Task.FromException(error);
                    }
                }
                finally
                {
                    if (completeSynchronously)
                    {
                        // if this is FALSE, we will be entering WriteToUnderlyingStreamAsync and releasing there.
                        _sem.Release();
                    }
                }
            }
 
            // Delegate to the async implementation.
            return WriteToUnderlyingStreamAsync(buffer, offset, count, cancellationToken, semaphoreLockTask);
        }
 
        private async Task WriteToUnderlyingStreamAsync(byte[] array, int offset, int count,
                                                        CancellationToken cancellationToken,
                                                        Task semaphoreLockTask)
        {
            // (These should be Contract.Requires(..) but that method had some issues in async methods; using Assert(..) for now.)
            EnsureNotClosed();
            EnsureCanWrite();
 
            // See the LARGE COMMENT in Write(..) for the explanation of the write buffer algorithm.
 
            await semaphoreLockTask.ConfigureAwait(false);
            try
            {
                // The buffer might have been changed by another async task while we were waiting on the semaphore.
                // However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE.
                int totalUserBytes;
                bool useBuffer;
                checked
                {
                    // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
                    totalUserBytes = _writePos + count;
                    useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize));
                }
 
                if (useBuffer)
                {
                    WriteToBuffer(array, ref offset, ref count);
 
                    if (_writePos < _bufferSize)
                    {
                        Contract.Assert(count == 0);
                        return;
                    }
 
                    Contract.Assert(count >= 0);
                    Contract.Assert(_writePos == _bufferSize);
                    Contract.Assert(_buffer != null);
 
                    await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false);
                    _writePos = 0;
 
                    WriteToBuffer(array, ref offset, ref count);
 
                    Contract.Assert(count == 0);
                    Contract.Assert(_writePos < _bufferSize);
                }
                else
                {
                    // if (!useBuffer)
 
                    // Write out the buffer if necessary.
                    if (_writePos > 0)
                    {
                        Contract.Assert(_buffer != null);
                        Contract.Assert(totalUserBytes >= _bufferSize);
 
                        // Try avoiding extra write to underlying stream by combining previously buffered data with current user data:
                        if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize)
                        {
                            EnsureShadowBufferAllocated();
                            Buffer.BlockCopy(array, offset, _buffer, _writePos, count);
                            await _stream.WriteAsync(_buffer, 0, totalUserBytes, cancellationToken).ConfigureAwait(false);
                            _writePos = 0;
                            return;
                        }
 
                        await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false);
                        _writePos = 0;
                    }
 
                    // Write out user data.
                    await _stream.WriteAsync(array, offset, count, cancellationToken).ConfigureAwait(false);
                }
            }
            finally
            {
                _sem.Release();
            }
        }
 
        public override void WriteByte(byte value)
        {
            EnsureNotClosed();
 
            // We should not be flushing here, but only writing to the underlying stream, but previous version flushed, so we keep this.
            if (_writePos >= _bufferSize - 1)
            {
                FlushWrite();
            }
 
            _buffer[_writePos++] = value;
 
            Contract.Assert(_writePos < _bufferSize);
        }
 
 
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException("seek");
        }
 
 
        public override void SetLength(long value)
        {
            throw new NotSupportedException("SetLength");
        }
    }
}