File: Blocks\TransformManyBlock.IAsyncEnumerable.cs
Web Access
Project: src\src\libraries\System.Threading.Tasks.Dataflow\src\System.Threading.Tasks.Dataflow.csproj (System.Threading.Tasks.Dataflow)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow.Internal;
 
namespace System.Threading.Tasks.Dataflow
{
    public partial class TransformManyBlock<TInput, TOutput>
    {
        /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function.</summary>
        /// <param name="transform">
        /// The function to invoke with each data element received.  All of the data from the returned <see cref="IAsyncEnumerable{TOutput}"/>
        /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
        /// </param>
        /// <exception cref="ArgumentNullException">The <paramref name="transform"/> is <see langword="null" />.</exception>
        public TransformManyBlock(Func<TInput, IAsyncEnumerable<TOutput>> transform) :
            this(transform, ExecutionDataflowBlockOptions.Default)
        {
        }
 
        /// <summary>Initializes the <see cref="TransformManyBlock{TInput,TOutput}"/> with the specified function and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
        /// <param name="transform">
        /// The function to invoke with each data element received.  All of the data from the returned <see cref="IAsyncEnumerable{TOutput}"/>
        /// will be made available as output from this <see cref="TransformManyBlock{TInput,TOutput}"/>.
        /// </param>
        /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformManyBlock{TInput,TOutput}"/>.</param>
        /// <exception cref="ArgumentNullException">The <paramref name="transform"/> or <paramref name="dataflowBlockOptions"/> is <see langword="null" />.</exception>
        public TransformManyBlock(Func<TInput, IAsyncEnumerable<TOutput>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions)
        {
            if (transform is null)
            {
                throw new ArgumentNullException(nameof(transform));
            }
 
            Initialize(messageWithId =>
            {
                Task t = ProcessMessageAsync(transform, messageWithId);
#if DEBUG
                // Task returned from ProcessMessageAsync is explicitly ignored.
                // That function handles all exceptions.
                t.ContinueWith(static t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
#endif
            }, dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion);
        }
 
        // Note:
        // Enumerating the IAsyncEnumerable is done with ConfigureAwait(true), using the default behavior of
        // paying attention to the current context/scheduler. This makes it so that the enumerable code runs on the target scheduler.
        // For this to work correctly, there can't be any ConfigureAwait(false) in the same method prior to
        // these await foreach loops, nor in the call chain prior to the method invocation.
 
        /// <summary>Processes the message with a user-provided transform function that returns an async enumerable.</summary>
        /// <param name="transformFunction">The transform function to use to process the message.</param>
        /// <param name="messageWithId">The message to be processed.</param>
        private async Task ProcessMessageAsync(Func<TInput, IAsyncEnumerable<TOutput>> transformFunction, KeyValuePair<TInput, long> messageWithId)
        {
            try
            {
                // Run the user transform and store the results.
                IAsyncEnumerable<TOutput> outputItems = transformFunction(messageWithId.Key);
                await StoreOutputItemsAsync(messageWithId, outputItems).ConfigureAwait(false);
            }
            catch (Exception exc)
            {
                // Enumerating the user's collection failed. If this exception represents cancellation,
                // swallow it rather than shutting down the block.
                if (!Common.IsCooperativeCancellation(exc))
                {
                    // The exception was not for cancellation. We must add the exception before declining
                    // and signaling completion, as the exception is part of the operation, and the completion
                    // conditions depend on this.
                    Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key);
                    _target.Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
                }
            }
            finally
            {
                // Let the target know that one of the asynchronous operations it launched has completed.
                _target.SignalOneAsyncMessageCompleted();
            }
        }
 
        /// <summary>
        /// Stores the output items, either into the reordering buffer or into the source half.
        /// Ensures that the bounding count is correctly updated.
        /// </summary>
        /// <param name="messageWithId">The message with id.</param>
        /// <param name="outputItems">The output items to be persisted.</param>
        private async Task StoreOutputItemsAsync(
            KeyValuePair<TInput, long> messageWithId, IAsyncEnumerable<TOutput>? outputItems)
        {
            // If there's a reordering buffer, pass the data along to it.
            // The reordering buffer will handle all details, including bounding.
            if (_reorderingBuffer is not null)
            {
                await StoreOutputItemsReorderedAsync(messageWithId.Value, outputItems).ConfigureAwait(false);
            }
            // Otherwise, output the data directly.
            else if (outputItems is not null)
            {
                await StoreOutputItemsNonReorderedWithIterationAsync(outputItems).ConfigureAwait(false);
            }
            else if (_target.IsBounded)
            {
                // outputItems is null and there's no reordering buffer
                // and we're bounding, so decrement the bounding count to
                // signify that the input element we already accounted for
                // produced no output
                _target.ChangeBoundingCount(count: -1);
            }
            // else there's no reordering buffer, there are no output items, and we're not bounded,
            // so there's nothing more to be done.
        }
 
        /// <summary>Stores the next item using the reordering buffer.</summary>
        /// <param name="id">The ID of the item.</param>
        /// <param name="item">The async enumerable.</param>
        private async Task StoreOutputItemsReorderedAsync(long id, IAsyncEnumerable<TOutput>? item)
        {
            Debug.Assert(_reorderingBuffer is not null, "Expected a reordering buffer");
            Debug.Assert(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
 
            // Grab info about the transform
            TargetCore<TInput> target = _target;
            bool isBounded = target.IsBounded;
 
            // Handle invalid items (null enumerables) by delegating to the base
            if (item is null)
            {
                _reorderingBuffer.AddItem(id, null, false);
                if (isBounded)
                {
                    target.ChangeBoundingCount(count: -1);
                }
                return;
            }
 
            // By this point, either we're not the next item, in which case we need to make a copy of the
            // data and store it, or we are the next item and can store it immediately but we need to enumerate
            // the items and store them individually because we don't want to enumerate while holding a lock.
            List<TOutput>? itemCopy = null;
            try
            {
                // If this is the next item, we can output it now.
                if (_reorderingBuffer.IsNext(id))
                {
                    await StoreOutputItemsNonReorderedWithIterationAsync(item).ConfigureAwait(false);
                    // here itemCopy remains null, so that base.AddItem will finish our interactions with the reordering buffer
                }
                else
                {
                    // We're not the next item, and we're not trusted, so copy the data into a list.
                    // We need to enumerate outside of the lock in the base class.
                    int itemCount = 0;
                    try
                    {
                        itemCopy = new List<TOutput>();
                        await foreach (TOutput element in item.ConfigureAwait(true))
                        {
                            itemCopy.Add(element);
                        }
                        itemCount = itemCopy.Count;
                    }
                    finally
                    {
                        // If we're here successfully, then itemCount is the number of output items
                        // we actually received, and we should update the bounding count with it.
                        // If we're here because ToList threw an exception, then itemCount will be 0,
                        // and we still need to update the bounding count with this in order to counteract
                        // the increased bounding count for the corresponding input.
                        if (isBounded)
                        {
                            UpdateBoundingCountWithOutputCount(count: itemCount);
                        }
                    }
                }
                // else if the item isn't valid, the finally block will see itemCopy as null and output invalid
            }
            finally
            {
                // Tell the base reordering buffer that we're done.  If we already output
                // all of the data, itemCopy will be null, and we just pass down the invalid item.
                // If we haven't, pass down the real thing.  We do this even in the case of an exception,
                // in which case this will be a dummy element.
                _reorderingBuffer.AddItem(id, itemCopy, itemIsValid: itemCopy is not null);
            }
        }
 
        /// <summary>
        /// Stores the untrusted async enumerable into the source core.
        /// This method does not go through the reordering buffer.
        /// </summary>
        /// <param name="outputItems">The untrusted enumerable.</param>
        private async Task StoreOutputItemsNonReorderedWithIterationAsync(IAsyncEnumerable<TOutput> outputItems)
        {
            // The _source we're adding to isn't thread-safe, so we need to determine
            // whether we need to lock.  If the block is configured with a max degree
            // of parallelism of 1, then only one transform can run at a time, and so
            // we don't need to lock.  Similarly, if there's a reordering buffer, then
            // it guarantees that we're invoked serially, and we don't need to lock.
            bool isSerial =
                _target.DataflowBlockOptions.MaxDegreeOfParallelism == 1 ||
                _reorderingBuffer is not null;
 
            // If we're bounding, we need to increment the bounded count
            // for each individual item as we enumerate it.
            if (_target.IsBounded)
            {
                // When the input item that generated this
                // output was loaded, we incremented the bounding count.  If it only
                // output a single a item, then we don't need to touch the bounding count.
                // Otherwise, we need to adjust the bounding count accordingly.
                bool outputFirstItem = false;
                try
                {
                    await foreach (TOutput item in outputItems.ConfigureAwait(true))
                    {
                        if (outputFirstItem)
                        {
                            _target.ChangeBoundingCount(count: 1);
                        }
                        outputFirstItem = true;
 
                        if (isSerial)
                        {
                            _source.AddMessage(item);
                        }
                        else
                        {
                            lock (ParallelSourceLock) // don't hold lock while enumerating
                            {
                                _source.AddMessage(item);
                            }
                        }
                    }
                }
                finally
                {
                    if (!outputFirstItem)
                    {
                        _target.ChangeBoundingCount(count: -1);
                    }
                }
            }
            // If we're not bounding, just output each individual item.
            else
            {
                if (isSerial)
                {
                    await foreach (TOutput item in outputItems.ConfigureAwait(true))
                    {
                        _source.AddMessage(item);
                    }
                }
                else
                {
                    await foreach (TOutput item in outputItems.ConfigureAwait(true))
                    {
                        lock (ParallelSourceLock) // don't hold lock while enumerating
                        {
                            _source.AddMessage(item);
                        }
                    }
                }
            }
        }
    }
}