|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Text.Json
{
internal sealed class PooledByteBufferWriter : PipeWriter, IDisposable
{
// This class allows two possible configurations: if rentedBuffer is not null then
// it can be used as an IBufferWriter and holds a buffer that should eventually be
// returned to the shared pool. If rentedBuffer is null, then the instance is in a
// cleared/disposed state and it must re-rent a buffer before it can be used again.
private byte[]? _rentedBuffer;
private int _index;
private readonly Stream? _stream;
private const int MinimumBufferSize = 256;
// Value copied from Array.MaxLength in System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Array.cs.
public const int MaximumBufferSize = 0X7FFFFFC7;
private PooledByteBufferWriter()
{
#if NET
// Ensure we are in sync with the Array.MaxLength implementation.
Debug.Assert(MaximumBufferSize == Array.MaxLength);
#endif
}
public PooledByteBufferWriter(int initialCapacity) : this()
{
Debug.Assert(initialCapacity > 0);
_rentedBuffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
}
public PooledByteBufferWriter(int initialCapacity, Stream stream) : this(initialCapacity)
{
_stream = stream;
}
public ReadOnlyMemory<byte> WrittenMemory
{
get
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);
return _rentedBuffer.AsMemory(0, _index);
}
}
public int WrittenCount
{
get
{
Debug.Assert(_rentedBuffer != null);
return _index;
}
}
public int Capacity
{
get
{
Debug.Assert(_rentedBuffer != null);
return _rentedBuffer.Length;
}
}
public int FreeCapacity
{
get
{
Debug.Assert(_rentedBuffer != null);
return _rentedBuffer.Length - _index;
}
}
public void Clear()
{
ClearHelper();
}
public void ClearAndReturnBuffers()
{
Debug.Assert(_rentedBuffer != null);
ClearHelper();
byte[] toReturn = _rentedBuffer;
_rentedBuffer = null;
ArrayPool<byte>.Shared.Return(toReturn);
}
private void ClearHelper()
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);
_rentedBuffer.AsSpan(0, _index).Clear();
_index = 0;
}
// Returns the rented buffer back to the pool
public void Dispose()
{
if (_rentedBuffer == null)
{
return;
}
ClearHelper();
byte[] toReturn = _rentedBuffer;
_rentedBuffer = null;
ArrayPool<byte>.Shared.Return(toReturn);
}
public void InitializeEmptyInstance(int initialCapacity)
{
Debug.Assert(initialCapacity > 0);
Debug.Assert(_rentedBuffer is null);
_rentedBuffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
}
public static PooledByteBufferWriter CreateEmptyInstanceForCaching() => new PooledByteBufferWriter();
public override void Advance(int count)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(count >= 0);
Debug.Assert(_index <= _rentedBuffer.Length - count);
_index += count;
}
public override Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_index);
}
public override Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_index);
}
#if NET
internal void WriteToStream(Stream destination)
{
destination.Write(WrittenMemory.Span);
}
#else
internal void WriteToStream(Stream destination)
{
Debug.Assert(_rentedBuffer != null);
destination.Write(_rentedBuffer, 0, _index);
}
#endif
private void CheckAndResizeBuffer(int sizeHint)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(sizeHint > 0);
int currentLength = _rentedBuffer.Length;
int availableSpace = currentLength - _index;
// If we've reached ~1GB written, grow to the maximum buffer
// length to avoid incessant minimal growths causing perf issues.
if (_index >= MaximumBufferSize / 2)
{
sizeHint = Math.Max(sizeHint, MaximumBufferSize - currentLength);
}
if (sizeHint > availableSpace)
{
int growBy = Math.Max(sizeHint, currentLength);
int newSize = currentLength + growBy;
if ((uint)newSize > MaximumBufferSize)
{
newSize = currentLength + sizeHint;
if ((uint)newSize > MaximumBufferSize)
{
ThrowHelper.ThrowOutOfMemoryException_BufferMaximumSizeExceeded((uint)newSize);
}
}
byte[] oldBuffer = _rentedBuffer;
_rentedBuffer = ArrayPool<byte>.Shared.Rent(newSize);
Debug.Assert(oldBuffer.Length >= _index);
Debug.Assert(_rentedBuffer.Length >= _index);
Span<byte> oldBufferAsSpan = oldBuffer.AsSpan(0, _index);
oldBufferAsSpan.CopyTo(_rentedBuffer);
oldBufferAsSpan.Clear();
ArrayPool<byte>.Shared.Return(oldBuffer);
}
Debug.Assert(_rentedBuffer.Length - _index > 0);
Debug.Assert(_rentedBuffer.Length - _index >= sizeHint);
}
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(_stream is not null);
#if NET
await _stream.WriteAsync(WrittenMemory, cancellationToken).ConfigureAwait(false);
#else
Debug.Assert(_rentedBuffer != null);
await _stream.WriteAsync(_rentedBuffer, 0, _index, cancellationToken).ConfigureAwait(false);
#endif
Clear();
return new FlushResult(isCanceled: false, isCompleted: false);
}
public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _index;
// This type is used internally in JsonSerializer to help buffer and flush bytes to the underlying Stream.
// It's only pretending to be a PipeWriter and doesn't need Complete or CancelPendingFlush for the internal usage.
public override void CancelPendingFlush() => throw new NotImplementedException();
public override void Complete(Exception? exception = null) => throw new NotImplementedException();
}
internal static partial class ThrowHelper
{
[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void ThrowOutOfMemoryException_BufferMaximumSizeExceeded(uint capacity)
{
throw new OutOfMemoryException(SR.Format(SR.BufferMaximumSizeExceeded, capacity));
}
}
}
|