File: src\Shared\SegmentWriteStream.cs
Web Access
Project: src\src\Shared\test\Shared.Tests\Microsoft.AspNetCore.Shared.Tests.csproj (Microsoft.AspNetCore.Shared.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
namespace Microsoft.AspNetCore.WriteStream;
 
internal sealed class SegmentWriteStream : Stream
{
    private readonly List<byte[]> _segments = new();
    private readonly MemoryStream _bufferStream = new();
    private readonly int _segmentSize;
    private long _length;
    private bool _closed;
    private bool _disposed;
 
    internal SegmentWriteStream(int segmentSize)
    {
        if (segmentSize <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(segmentSize), segmentSize, $"{nameof(segmentSize)} must be greater than 0.");
        }
 
        _segmentSize = segmentSize;
    }
 
    // Extracting the buffered segments closes the stream for writing
    internal List<byte[]> GetSegments()
    {
        if (!_closed)
        {
            _closed = true;
            FinalizeSegments();
        }
        return _segments;
    }
 
    public override bool CanRead => false;
 
    public override bool CanSeek => false;
 
    public override bool CanWrite => !_closed;
 
    public override long Length => _length;
 
    public override long Position
    {
        get
        {
            return _length;
        }
        set
        {
            throw new NotSupportedException("The stream does not support seeking.");
        }
    }
 
    private void DisposeMemoryStream()
    {
        // Clean up the memory stream
        _bufferStream.SetLength(0);
        _bufferStream.Capacity = 0;
        _bufferStream.Dispose();
    }
 
    private void FinalizeSegments()
    {
        // Append any remaining segments
        if (_bufferStream.Length > 0)
        {
            // Add the last segment
            _segments.Add(_bufferStream.ToArray());
        }
 
        DisposeMemoryStream();
    }
 
    protected override void Dispose(bool disposing)
    {
        try
        {
            if (_disposed)
            {
                return;
            }
 
            if (disposing)
            {
                _segments.Clear();
                DisposeMemoryStream();
            }
 
            _disposed = true;
            _closed = true;
        }
        finally
        {
            base.Dispose(disposing);
        }
    }
 
    public override void Flush()
    {
        // No-op
    }
 
    public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
 
    public override int Read(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException("The stream does not support reading.");
    }
 
    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException("The stream does not support seeking.");
    }
 
    public override void SetLength(long value)
    {
        throw new NotSupportedException("The stream does not support seeking.");
    }
 
    public override void Write(byte[] buffer, int offset, int count)
    {
        ValidateBufferArguments(buffer, offset, count);
        if (!CanWrite)
        {
            throw new ObjectDisposedException(nameof(SegmentWriteStream), "The stream has been closed for writing.");
        }
 
        Write(buffer.AsSpan(offset, count));
    }
 
    public override void Write(ReadOnlySpan<byte> buffer)
    {
        while (!buffer.IsEmpty)
        {
            if ((int)_bufferStream.Length == _segmentSize)
            {
                _segments.Add(_bufferStream.ToArray());
                _bufferStream.SetLength(0);
            }
 
            var bytesWritten = Math.Min(buffer.Length, _segmentSize - (int)_bufferStream.Length);
 
            _bufferStream.Write(buffer[..bytesWritten]);
            buffer = buffer[bytesWritten..];
            _length += bytesWritten;
        }
    }
 
    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        Write(buffer, offset, count);
        return Task.CompletedTask;
    }
 
    public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
    {
        Write(buffer.Span);
        return default;
    }
 
    public override void WriteByte(byte value)
    {
        if (!CanWrite)
        {
            throw new ObjectDisposedException(nameof(SegmentWriteStream), "The stream has been closed for writing.");
        }
 
        if ((int)_bufferStream.Length == _segmentSize)
        {
            _segments.Add(_bufferStream.ToArray());
            _bufferStream.SetLength(0);
        }
 
        _bufferStream.WriteByte(value);
        _length++;
    }
 
    public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
        => TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);
 
    public override void EndWrite(IAsyncResult asyncResult)
        => TaskToApm.End(asyncResult);
}