File: System\Text\Json\Serialization\Converters\Collection\IAsyncEnumerableOfTConverter.cs
Web Access
Project: src\src\libraries\System.Text.Json\src\System.Text.Json.csproj (System.Text.Json)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Text.Json.Serialization.Converters
{
    internal sealed class IAsyncEnumerableOfTConverter<TAsyncEnumerable, TElement>
        : JsonCollectionConverter<TAsyncEnumerable, TElement>
        where TAsyncEnumerable : IAsyncEnumerable<TElement>
    {
        internal override bool OnTryRead(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options, scoped ref ReadStack state, out TAsyncEnumerable value)
        {
            if (!typeToConvert.IsAssignableFrom(typeof(IAsyncEnumerable<TElement>)))
            {
                ThrowHelper.ThrowNotSupportedException_CannotPopulateCollection(Type, ref reader, ref state);
            }
 
            return base.OnTryRead(ref reader, typeToConvert, options, ref state, out value!);
        }
 
        protected override void Add(in TElement value, ref ReadStack state)
        {
            ((BufferedAsyncEnumerable)state.Current.ReturnValue!)._buffer.Add(value);
        }
 
        internal override bool SupportsCreateObjectDelegate => false;
        protected override void CreateCollection(ref Utf8JsonReader reader, scoped ref ReadStack state, JsonSerializerOptions options)
        {
            state.Current.ReturnValue = new BufferedAsyncEnumerable();
        }
 
        internal override bool OnTryWrite(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state)
        {
            if (!state.SupportAsync)
            {
                ThrowHelper.ThrowNotSupportedException_TypeRequiresAsyncSerialization(Type);
            }
 
            return base.OnTryWrite(writer, value, options, ref state);
        }
 
        [Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Converter needs to consume ValueTask's in a non-async context")]
        protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state)
        {
            IAsyncEnumerator<TElement> enumerator;
            ValueTask<bool> moveNextTask;
 
            if (state.Current.AsyncDisposable is null)
            {
                enumerator = value.GetAsyncEnumerator(state.CancellationToken);
                // async enumerators can only be disposed asynchronously;
                // store in the WriteStack for future disposal
                // by the root async serialization context.
                state.Current.AsyncDisposable = enumerator;
                // enumerator.MoveNextAsync() calls can throw,
                // ensure the enumerator already is stored
                // in the WriteStack for proper disposal.
                moveNextTask = enumerator.MoveNextAsync();
 
                if (!moveNextTask.IsCompleted)
                {
                    // It is common for first-time MoveNextAsync() calls to return pending tasks,
                    // since typically that is when underlying network connections are being established.
                    // For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array)
                    // to give the stream owner the ability to recover in case of a connection error.
                    state.SuppressFlush = true;
                    goto SuspendDueToPendingTask;
                }
            }
            else
            {
                Debug.Assert(state.Current.AsyncDisposable is IAsyncEnumerator<TElement>);
                enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncDisposable;
 
                if (state.Current.AsyncEnumeratorIsPendingCompletion)
                {
                    // converter was previously suspended due to a pending MoveNextAsync() task
                    Debug.Assert(state.PendingTask is Task<bool> && state.PendingTask.IsCompleted);
                    moveNextTask = new ValueTask<bool>((Task<bool>)state.PendingTask);
                    state.Current.AsyncEnumeratorIsPendingCompletion = false;
                    state.PendingTask = null;
                }
                else
                {
                    // converter was suspended for a different reason;
                    // the last MoveNextAsync() call can only have completed with 'true'.
                    moveNextTask = new ValueTask<bool>(true);
                }
            }
 
            Debug.Assert(moveNextTask.IsCompleted);
            JsonConverter<TElement> converter = GetElementConverter(ref state);
 
            // iterate through the enumerator while elements are being returned synchronously
            do
            {
                if (!moveNextTask.Result)
                {
                    // we have completed serialization for the enumerator,
                    // clear from the stack and schedule for async disposal.
                    state.Current.AsyncDisposable = null;
                    state.AddCompletedAsyncDisposable(enumerator);
                    return true;
                }
 
                if (ShouldFlush(writer, ref state))
                {
                    return false;
                }
 
                TElement element = enumerator.Current;
                if (!converter.TryWrite(writer, element, options, ref state))
                {
                    return false;
                }
 
                state.Current.EndCollectionElement();
                moveNextTask = enumerator.MoveNextAsync();
            } while (moveNextTask.IsCompleted);
 
        SuspendDueToPendingTask:
            // we have a pending MoveNextAsync() call;
            // wrap inside a regular task so that it can be awaited multiple times;
            // mark the current stackframe as pending completion.
            Debug.Assert(state.PendingTask is null);
            state.PendingTask = moveNextTask.AsTask();
            state.Current.AsyncEnumeratorIsPendingCompletion = true;
            return false;
        }
 
        private sealed class BufferedAsyncEnumerable : IAsyncEnumerable<TElement>
        {
            public readonly List<TElement> _buffer = new();
 
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
            public async IAsyncEnumerator<TElement> GetAsyncEnumerator(CancellationToken _)
            {
                foreach (TElement element in _buffer)
                {
                    yield return element;
                }
            }
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
        }
    }
}