File: System\Text\Json\Serialization\ReadBufferState.cs
Web Access
Project: src\src\libraries\System.Text.Json\src\System.Text.Json.csproj (System.Text.Json)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Text.Json.Serialization
{
    [StructLayout(LayoutKind.Auto)]
    internal struct ReadBufferState : IDisposable
    {
        private byte[] _buffer;
        private byte _offset; // Read bytes offset typically used when skipping the UTF-8 BOM.
        private int _count; // Number of read bytes yet to be consumed by the serializer.
        private int _maxCount; // Number of bytes we need to clear before returning the buffer.
        private bool _isFirstBlock;
        private bool _isFinalBlock;
 
        // An "unsuccessful read" in this context refers to a buffer read operation that
        // wasn't sufficient to advance the reader to the next token. This occurs primarily
        // when consuming large JSON strings (which don't support streaming today) but is
        // also possible with other token types such as numbers, booleans, or nulls.
        //
        // The JsonSerializer.DeserializeAsyncEnumerable methods employ a special buffering
        // strategy where rather than attempting to fill the entire buffer, the deserializer
        // will be invoked as soon as the first chunk of data is read from the stream.
        // This is to ensure liveness: data should be surfaced on the IAE as soon as they
        // are streamed from the server. On the other hand, this can create performance
        // problems in cases where the underlying stream uses extremely fine-grained buffering.
        // For this reason, we employ a threshold that will revert to buffer filling once crossed.
        // The counter is reset to zero whenever the JSON reader has been advanced successfully.
        //
        // The threshold is set to 5 unsuccessful reads. This is a relatively conservative threshold
        // but should still make fallback unlikely in most scenaria. It should ensure that fallback
        // isn't triggered in null or boolean tokens even in the worst-case scenario where they are
        // streamed one byte at a time.
        private const int UnsuccessfulReadCountThreshold = 5;
        private int _unsuccessfulReadCount;
 
        public ReadBufferState(int initialBufferSize)
        {
            _buffer = ArrayPool<byte>.Shared.Rent(Math.Max(initialBufferSize, JsonConstants.Utf8Bom.Length));
            _maxCount = _count = _offset = 0;
            _isFirstBlock = true;
            _isFinalBlock = false;
        }
 
        public readonly bool IsFinalBlock => _isFinalBlock;
 
        public readonly ReadOnlySpan<byte> Bytes => _buffer.AsSpan(_offset, _count);
 
        /// <summary>
        /// Read from the stream until either our buffer is filled or we hit EOF.
        /// Calling ReadCore is relatively expensive, so we minimize the number of times
        /// we need to call it.
        /// </summary>
        public readonly async ValueTask<ReadBufferState> ReadFromStreamAsync(
            Stream utf8Json,
            CancellationToken cancellationToken,
            bool fillBuffer = true)
        {
            // Since mutable structs don't work well with async state machines,
            // make all updates on a copy which is returned once complete.
            ReadBufferState bufferState = this;
 
            int minBufferCount = fillBuffer || _unsuccessfulReadCount > UnsuccessfulReadCountThreshold ? bufferState._buffer.Length : 0;
            do
            {
                int bytesRead = await utf8Json.ReadAsync(
#if NET
                    bufferState._buffer.AsMemory(bufferState._count),
#else
                    bufferState._buffer, bufferState._count, bufferState._buffer.Length - bufferState._count,
#endif
                    cancellationToken).ConfigureAwait(false);
 
                if (bytesRead == 0)
                {
                    bufferState._isFinalBlock = true;
                    break;
                }
 
                bufferState._count += bytesRead;
            }
            while (bufferState._count < minBufferCount);
 
            bufferState.ProcessReadBytes();
            return bufferState;
        }
 
        /// <summary>
        /// Read from the stream until either our buffer is filled or we hit EOF.
        /// Calling ReadCore is relatively expensive, so we minimize the number of times
        /// we need to call it.
        /// </summary>
        public void ReadFromStream(Stream utf8Json)
        {
            do
            {
                int bytesRead = utf8Json.Read(
#if NET
                    _buffer.AsSpan(_count));
#else
                    _buffer, _count, _buffer.Length - _count);
#endif
 
                if (bytesRead == 0)
                {
                    _isFinalBlock = true;
                    break;
                }
 
                _count += bytesRead;
            }
            while (_count < _buffer.Length);
 
            ProcessReadBytes();
        }
 
        /// <summary>
        /// Advances the buffer in anticipation of a subsequent read operation.
        /// </summary>
        public void AdvanceBuffer(int bytesConsumed)
        {
            Debug.Assert(bytesConsumed <= _count);
 
            _unsuccessfulReadCount = bytesConsumed == 0 ? _unsuccessfulReadCount + 1 : 0;
            _count -= bytesConsumed;
 
            if (!_isFinalBlock)
            {
                // Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization.
                if ((uint)_count > ((uint)_buffer.Length / 2))
                {
                    // We have less than half the buffer available, double the buffer size.
                    byte[] oldBuffer = _buffer;
                    int oldMaxCount = _maxCount;
                    byte[] newBuffer = ArrayPool<byte>.Shared.Rent((_buffer.Length < (int.MaxValue / 2)) ? _buffer.Length * 2 : int.MaxValue);
 
                    // Copy the unprocessed data to the new buffer while shifting the processed bytes.
                    Buffer.BlockCopy(oldBuffer, _offset + bytesConsumed, newBuffer, 0, _count);
                    _buffer = newBuffer;
                    _maxCount = _count;
 
                    // Clear and return the old buffer
                    new Span<byte>(oldBuffer, 0, oldMaxCount).Clear();
                    ArrayPool<byte>.Shared.Return(oldBuffer);
                }
                else if (_count != 0)
                {
                    // Shift the processed bytes to the beginning of buffer to make more room.
                    Buffer.BlockCopy(_buffer, _offset + bytesConsumed, _buffer, 0, _count);
                }
            }
 
            _offset = 0;
        }
 
        private void ProcessReadBytes()
        {
            if (_count > _maxCount)
            {
                _maxCount = _count;
            }
 
            if (_isFirstBlock)
            {
                _isFirstBlock = false;
 
                // Handle the UTF-8 BOM if present
                Debug.Assert(_buffer.Length >= JsonConstants.Utf8Bom.Length);
                if (_buffer.AsSpan(0, _count).StartsWith(JsonConstants.Utf8Bom))
                {
                    _offset = (byte)JsonConstants.Utf8Bom.Length;
                    _count -= JsonConstants.Utf8Bom.Length;
                }
            }
        }
 
        public void Dispose()
        {
            // Clear only what we used and return the buffer to the pool
            new Span<byte>(_buffer, 0, _maxCount).Clear();
 
            byte[] toReturn = _buffer;
            _buffer = null!;
 
            ArrayPool<byte>.Shared.Return(toReturn);
        }
    }
}