File: Blocks\BatchedJoinBlock.cs
Web Access
Project: src\src\libraries\System.Threading.Tasks.Dataflow\src\System.Threading.Tasks.Dataflow.csproj (System.Threading.Tasks.Dataflow)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchedJoinBlock.cs
//
//
// A propagator block that groups individual messages of multiple types
// into tuples of arrays of those messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks.Dataflow.Internal;
 
namespace System.Threading.Tasks.Dataflow
{
    /// <summary>
    /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
    /// provided to one or more of its targets.
    /// </summary>
    /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
    /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
    [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
    [DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
    public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
    {
        /// <summary>The size of the batches generated by this BatchedJoin.</summary>
        private readonly int _batchSize;
        /// <summary>State shared among the targets.</summary>
        private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
        /// <summary>The target providing inputs of type T1.</summary>
        private readonly BatchedJoinBlockTarget<T1> _target1;
        /// <summary>The target providing inputs of type T2.</summary>
        private readonly BatchedJoinBlockTarget<T2> _target2;
        /// <summary>The source side.</summary>
        private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;
 
        /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
        /// <param name="batchSize">The number of items to group into a batch.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
        public BatchedJoinBlock(int batchSize) :
            this(batchSize, GroupingDataflowBlockOptions.Default)
        { }
 
        /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
        /// <param name="batchSize">The number of items to group into a batch.</param>
        /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
        /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
        public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
        {
            // Validate arguments
            if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive);
            if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
            if (!dataflowBlockOptions.Greedy) throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions));
            if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded) throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, nameof(dataflowBlockOptions));
 
            // Store arguments
            _batchSize = batchSize;
            dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
 
            // Configure the source
            _source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
                this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
 
            // The action to run when a batch should be created.  This is typically called
            // when we have a full batch, but it will also be called when we're done receiving
            // messages, and thus when there may be a few stragglers we need to make a batch out of.
            Action createBatchAction = () =>
            {
                if (_target1!.Count > 0 || _target2!.Count > 0)
                {
                    _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages()));
                }
            };
 
            // Configure the targets
            _sharedResources = new BatchedJoinBlockTargetSharedResources(
                batchSize, dataflowBlockOptions,
                createBatchAction,
                () =>
                {
                    createBatchAction();
                    _source.Complete();
                },
                _source.AddException,
                Complete);
            _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
            _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
 
            // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
            // In those cases we need to fault the target half to drop its buffered messages and to release its
            // reservations. This should not create an infinite loop, because all our implementations are designed
            // to handle multiple completion requests and to carry over only one.
            _source.Completion.ContinueWith(static (completed, state) =>
            {
                var thisBlock = ((BatchedJoinBlock<T1, T2>)state!) as IDataflowBlock;
                Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
                thisBlock.Fault(completed.Exception!);
            }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
 
            // Handle async cancellation requests by declining on the target
            Common.WireCancellationToComplete(
                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
            DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
            if (etwLog.IsEnabled())
            {
                etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
            }
        }
 
        /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
        public int BatchSize { get { return _batchSize; } }
 
        /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
        public ITargetBlock<T1> Target1 { get { return _target1; } }
 
        /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
        public ITargetBlock<T2> Target2 { get { return _target2; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
        public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
        {
            return _source.LinkTo(target, linkOptions);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
        public bool TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>>? filter, [NotNullWhen(true)] out Tuple<IList<T1>, IList<T2>>? item)
        {
            return _source.TryReceive(filter, out item);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
        public bool TryReceiveAll([NotNullWhen(true)] out IList<Tuple<IList<T1>, IList<T2>>>? items) { return _source.TryReceiveAll(out items); }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
        public int OutputCount { get { return _source.OutputCount; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
        public Task Completion { get { return _source.Completion; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
        public void Complete()
        {
            Debug.Assert(_target1 != null, "_target1 not initialized");
            Debug.Assert(_target2 != null, "_target2 not initialized");
 
            _target1.Complete();
            _target2.Complete();
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception is null)
            {
                throw new ArgumentNullException(nameof(exception));
            }
 
            Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
            Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
            Debug.Assert(_source != null, "_source not initialized");
 
            lock (_sharedResources._incomingLock)
            {
                if (!_sharedResources._decliningPermanently) _source.AddException(exception);
            }
            Complete();
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
        Tuple<IList<T1>, IList<T2>>? ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out bool messageConsumed)
        {
            return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
        bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
        {
            return _source.ReserveMessage(messageHeader, target);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
        void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
        {
            _source.ReleaseReservation(messageHeader, target);
        }
 
        /// <summary>
        /// Invokes Complete on each target
        /// </summary>
        private void CompleteEachTarget()
        {
            _target1.Complete();
            _target2.Complete();
        }
 
        /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
        private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
        public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
 
        /// <summary>The data to display in the debugger display attribute.</summary>
        private object DebuggerDisplayContent =>
            $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize = {BatchSize}, OutputCount = {OutputCountForDebugger}";
 
        /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
        object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
 
        /// <summary>Provides a debugger type proxy for the Transform.</summary>
        private sealed class DebugView
        {
            /// <summary>The block being viewed.</summary>
            private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;
            /// <summary>The source half of the block being viewed.</summary>
            private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;
 
            /// <summary>Initializes the debug view.</summary>
            /// <param name="batchedJoinBlock">The batched join being viewed.</param>
            public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
            {
                Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
                _batchedJoinBlock = batchedJoinBlock;
                _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
            }
 
            /// <summary>Gets the messages waiting to be received.</summary>
            public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
            /// <summary>Gets the number of batches created.</summary>
            public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
            /// <summary>Gets the number of items remaining to form a batch.</summary>
            public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
 
            /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
            public int BatchSize { get { return _batchedJoinBlock._batchSize; } }
            /// <summary>Gets the first target.</summary>
            public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
            /// <summary>Gets the second target.</summary>
            public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
 
            /// <summary>Gets the task being used for output processing.</summary>
            public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
 
            /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
            public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
            /// <summary>Gets whether the block is completed.</summary>
            public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
            /// <summary>Gets the block's Id.</summary>
            public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
 
            /// <summary>Gets the set of all targets linked from this block.</summary>
            public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
            /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
            public ITargetBlock<Tuple<IList<T1>, IList<T2>>>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
        }
    }
 
    /// <summary>
    /// Provides a dataflow block that batches a specified number of inputs of potentially differing types
    /// provided to one or more of its targets.
    /// </summary>
    /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
    /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
    /// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
    [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
    [DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
    public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
    {
        /// <summary>The size of the batches generated by this BatchedJoin.</summary>
        private readonly int _batchSize;
        /// <summary>State shared among the targets.</summary>
        private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
        /// <summary>The target providing inputs of type T1.</summary>
        private readonly BatchedJoinBlockTarget<T1> _target1;
        /// <summary>The target providing inputs of type T2.</summary>
        private readonly BatchedJoinBlockTarget<T2> _target2;
        /// <summary>The target providing inputs of type T3.</summary>
        private readonly BatchedJoinBlockTarget<T3> _target3;
        /// <summary>The source side.</summary>
        private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;
 
        /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
        /// <param name="batchSize">The number of items to group into a batch.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
        public BatchedJoinBlock(int batchSize) :
            this(batchSize, GroupingDataflowBlockOptions.Default)
        { }
 
        /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
        /// <param name="batchSize">The number of items to group into a batch.</param>
        /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
        /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
        /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
        public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
        {
            // Validate arguments
            if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive);
            if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
            if (!dataflowBlockOptions.Greedy ||
                dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
            {
                throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions));
            }
 
            // Store arguments
            _batchSize = batchSize;
            dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
 
            // Configure the source
            _source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
                this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
 
            // The action to run when a batch should be created.  This is typically called
            // when we have a full batch, but it will also be called when we're done receiving
            // messages, and thus when there may be a few stragglers we need to make a batch out of.
            Action createBatchAction = () =>
            {
                if (_target1!.Count > 0 || _target2!.Count > 0 || _target3!.Count > 0)
                {
                    _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages(), _target3!.GetAndEmptyMessages()));
                }
            };
 
            // Configure the targets
            _sharedResources = new BatchedJoinBlockTargetSharedResources(
                batchSize, dataflowBlockOptions,
                createBatchAction,
                () =>
                {
                    createBatchAction();
                    _source.Complete();
                },
                _source.AddException,
                Complete);
            _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
            _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
            _target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);
 
            // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
            // In those cases we need to fault the target half to drop its buffered messages and to release its
            // reservations. This should not create an infinite loop, because all our implementations are designed
            // to handle multiple completion requests and to carry over only one.
            _source.Completion.ContinueWith(static (completed, state) =>
            {
                var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
                Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
                thisBlock.Fault(completed.Exception!);
            }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
 
            // Handle async cancellation requests by declining on the target
            Common.WireCancellationToComplete(
                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
            DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
            if (etwLog.IsEnabled())
            {
                etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
            }
        }
 
        /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
        public int BatchSize { get { return _batchSize; } }
 
        /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
        public ITargetBlock<T1> Target1 { get { return _target1; } }
 
        /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
        public ITargetBlock<T2> Target2 { get { return _target2; } }
 
        /// <summary>Gets a target that may be used to offer messages of the third type.</summary>
        public ITargetBlock<T3> Target3 { get { return _target3; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
        public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
        {
            return _source.LinkTo(target, linkOptions);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
        public bool TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>>? filter, [NotNullWhen(true)] out Tuple<IList<T1>, IList<T2>, IList<T3>>? item)
        {
            return _source.TryReceive(filter, out item);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
        public bool TryReceiveAll([NotNullWhen(true)] out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>>? items) { return _source.TryReceiveAll(out items); }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
        public int OutputCount { get { return _source.OutputCount; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
        public Task Completion { get { return _source.Completion; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
        public void Complete()
        {
            Debug.Assert(_target1 != null, "_target1 not initialized");
            Debug.Assert(_target2 != null, "_target2 not initialized");
            Debug.Assert(_target3 != null, "_target3 not initialized");
 
            _target1.Complete();
            _target2.Complete();
            _target3.Complete();
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception is null)
            {
                throw new ArgumentNullException(nameof(exception));
            }
 
            Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
            Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
            Debug.Assert(_source != null, "_source not initialized");
 
            lock (_sharedResources._incomingLock)
            {
                if (!_sharedResources._decliningPermanently) _source.AddException(exception);
            }
            Complete();
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
        Tuple<IList<T1>, IList<T2>, IList<T3>>? ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out bool messageConsumed)
        {
            return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
        bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
        {
            return _source.ReserveMessage(messageHeader, target);
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
        void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
        {
            _source.ReleaseReservation(messageHeader, target);
        }
 
        /// <summary>
        /// Invokes Complete on each target
        /// </summary>
        private void CompleteEachTarget()
        {
            _target1.Complete();
            _target2.Complete();
            _target3.Complete();
        }
 
        /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
        private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
        public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
 
        /// <summary>The data to display in the debugger display attribute.</summary>
        private object DebuggerDisplayContent =>
            $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize = {BatchSize}, OutputCount = {OutputCountForDebugger}";
 
        /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
        object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
 
        /// <summary>Provides a debugger type proxy for the Transform.</summary>
        private sealed class DebugView
        {
            /// <summary>The block being viewed.</summary>
            private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;
            /// <summary>The source half of the block being viewed.</summary>
            private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;
 
            /// <summary>Initializes the debug view.</summary>
            /// <param name="batchedJoinBlock">The batched join being viewed.</param>
            public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
            {
                Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
                _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
                _batchedJoinBlock = batchedJoinBlock;
            }
 
            /// <summary>Gets the messages waiting to be received.</summary>
            public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
            /// <summary>Gets the number of batches created.</summary>
            public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
            /// <summary>Gets the number of items remaining to form a batch.</summary>
            public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
 
            /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
            public int BatchSize { get { return _batchedJoinBlock._batchSize; } }
            /// <summary>Gets the first target.</summary>
            public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
            /// <summary>Gets the second target.</summary>
            public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
            /// <summary>Gets the second target.</summary>
            public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }
 
            /// <summary>Gets the task being used for output processing.</summary>
            public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
 
            /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
            public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
            /// <summary>Gets whether the block is completed.</summary>
            public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
            /// <summary>Gets the block's Id.</summary>
            public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
 
            /// <summary>Gets the set of all targets linked from this block.</summary>
            public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
            /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
            public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
        }
    }
}
 
namespace System.Threading.Tasks.Dataflow.Internal
{
    /// <summary>Provides the target used in a BatchedJoin.</summary>
    /// <typeparam name="T">Specifies the type of data accepted by this target.</typeparam>
    [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
    [DebuggerTypeProxy(typeof(BatchedJoinBlockTarget<>.DebugView))]
    internal sealed class BatchedJoinBlockTarget<T> : ITargetBlock<T>, IDebuggerDisplay
    {
        /// <summary>The shared resources used by all targets associated with the same batched join instance.</summary>
        private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
        /// <summary>Whether this target is declining future messages.</summary>
        private bool _decliningPermanently;
        /// <summary>Input messages for the next batch.</summary>
        private List<T> _messages = new List<T>();
 
        /// <summary>Initializes the target.</summary>
        /// <param name="sharedResources">The shared resources used by all targets associated with this batched join.</param>
        internal BatchedJoinBlockTarget(BatchedJoinBlockTargetSharedResources sharedResources)
        {
            Debug.Assert(sharedResources != null, "Targets require a shared resources through which to communicate.");
 
            // Store the shared resources, and register with it to let it know there's
            // another target. This is done in a non-thread-safe manner and must be done
            // during construction of the batched join instance.
            _sharedResources = sharedResources;
            sharedResources._remainingAliveTargets++;
        }
 
        /// <summary>Gets the number of messages buffered in this target.</summary>
        internal int Count { get { return _messages.Count; } }
 
        /// <summary>Gets the messages buffered by this target and then empties the collection.</summary>
        /// <returns>The messages from the target.</returns>
        internal IList<T> GetAndEmptyMessages()
        {
            Common.ContractAssertMonitorStatus(_sharedResources._incomingLock, held: true);
 
            List<T> toReturn = _messages;
            _messages = new List<T>();
            return toReturn;
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
        {
            // Validate arguments
            if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
            if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
 
            lock (_sharedResources._incomingLock)
            {
                // If we've already stopped accepting messages, decline permanently
                if (_decliningPermanently ||
                    _sharedResources._decliningPermanently)
                    return DataflowMessageStatus.DecliningPermanently;
 
                // Consume the message from the source if necessary, and store the message
                if (consumeToAccept)
                {
                    Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
 
                    bool consumed;
                    messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
                    if (!consumed) return DataflowMessageStatus.NotAvailable;
                }
                _messages.Add(messageValue!);
 
                // If this message makes a batch, notify the shared resources that a batch has been completed
                if (--_sharedResources._remainingItemsInBatch == 0) _sharedResources._batchSizeReachedAction();
 
                return DataflowMessageStatus.Accepted;
            }
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
        public void Complete()
        {
            lock (_sharedResources._incomingLock)
            {
                // If this is the first time Complete is being called,
                // note that there's now one fewer targets receiving messages for the batched join.
                if (!_decliningPermanently)
                {
                    _decliningPermanently = true;
                    if (--_sharedResources._remainingAliveTargets == 0) _sharedResources._allTargetsDecliningPermanentlyAction();
                }
            }
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception is null)
            {
                throw new ArgumentNullException(nameof(exception));
            }
 
            lock (_sharedResources._incomingLock)
            {
                if (!_decliningPermanently && !_sharedResources._decliningPermanently) _sharedResources._exceptionAction(exception);
            }
 
            _sharedResources._completionAction();
        }
 
        /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
        Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
 
        /// <summary>The data to display in the debugger display attribute.</summary>
        private object DebuggerDisplayContent =>
            $"{Common.GetNameForDebugger(this)} InputCount = {_messages.Count}";
 
        /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
        object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
 
        /// <summary>Provides a debugger type proxy for the Transform.</summary>
        private sealed class DebugView
        {
            /// <summary>The batched join block target being viewed.</summary>
            private readonly BatchedJoinBlockTarget<T> _batchedJoinBlockTarget;
 
            /// <summary>Initializes the debug view.</summary>
            /// <param name="batchedJoinBlockTarget">The batched join target being viewed.</param>
            public DebugView(BatchedJoinBlockTarget<T> batchedJoinBlockTarget)
            {
                Debug.Assert(batchedJoinBlockTarget != null, "Need a block with which to construct the debug view.");
                _batchedJoinBlockTarget = batchedJoinBlockTarget;
            }
 
            /// <summary>Gets the messages waiting to be processed.</summary>
            public IEnumerable<T> InputQueue { get { return _batchedJoinBlockTarget._messages; } }
            /// <summary>Gets whether the block is declining further messages.</summary>
            public bool IsDecliningPermanently
            {
                get
                {
                    return _batchedJoinBlockTarget._decliningPermanently ||
                        _batchedJoinBlockTarget._sharedResources._decliningPermanently;
                }
            }
        }
    }
 
    /// <summary>Provides a container for resources shared across all targets used by the same BatchedJoinBlock instance.</summary>
    internal sealed class BatchedJoinBlockTargetSharedResources
    {
        /// <summary>Initializes the shared resources.</summary>
        /// <param name="batchSize">The size of a batch to create.</param>
        /// <param name="dataflowBlockOptions">The options used to configure the shared resources.  Assumed to be immutable.</param>
        /// <param name="batchSizeReachedAction">The action to invoke when a batch is completed.</param>
        /// <param name="allTargetsDecliningAction">The action to invoke when no more targets are accepting input.</param>
        /// <param name="exceptionAction">The action to invoke when an exception needs to be logged.</param>
        /// <param name="completionAction">The action to invoke when completing, typically invoked due to a call to Fault.</param>
        internal BatchedJoinBlockTargetSharedResources(
            int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions,
            Action batchSizeReachedAction, Action allTargetsDecliningAction,
            Action<Exception> exceptionAction, Action completionAction)
        {
            Debug.Assert(batchSize >= 1, "A positive batch size is required.");
            Debug.Assert(batchSizeReachedAction != null, "Need an action to invoke for each batch.");
            Debug.Assert(allTargetsDecliningAction != null, "Need an action to invoke when all targets have declined.");
 
            _incomingLock = new object();
            _batchSize = batchSize;
 
            // _remainingAliveTargets will be incremented when targets are added.
            // They must be added during construction of the BatchedJoin<...>.
            _remainingAliveTargets = 0;
            _remainingItemsInBatch = batchSize;
 
            // Configure what to do when batches are completed and/or all targets start declining
            _allTargetsDecliningPermanentlyAction = () =>
            {
                // Invoke the caller's action
                allTargetsDecliningAction();
 
                // Don't accept any more messages.  We should already
                // be doing this anyway through each individual target's declining flag,
                // so setting it to true is just a precaution and is also helpful
                // when onceOnly is true.
                _decliningPermanently = true;
            };
            _batchSizeReachedAction = () =>
            {
                // Invoke the caller's action
                batchSizeReachedAction();
                _batchesCreated++;
 
                // If this batched join is meant to be used for only a single
                // batch, invoke the completion logic.
                if (_batchesCreated >= dataflowBlockOptions.ActualMaxNumberOfGroups) _allTargetsDecliningPermanentlyAction();
 
                // Otherwise, get ready for the next batch.
                else _remainingItemsInBatch = _batchSize;
            };
            _exceptionAction = exceptionAction;
            _completionAction = completionAction;
        }
 
        /// <summary>
        /// A lock used to synchronize all incoming messages on all targets. It protects all of the rest
        /// of the shared Resources's state and will be held while invoking the delegates.
        /// </summary>
        internal readonly object _incomingLock;
        /// <summary>The size of the batches to generate.</summary>
        internal readonly int _batchSize;
 
        /// <summary>The action to invoke when enough elements have been accumulated to make a batch.</summary>
        internal readonly Action _batchSizeReachedAction;
        /// <summary>The action to invoke when all targets are declining further messages.</summary>
        internal readonly Action _allTargetsDecliningPermanentlyAction;
        /// <summary>The action to invoke when an exception has to be logged.</summary>
        internal readonly Action<Exception> _exceptionAction;
        /// <summary>The action to invoke when the owning block has to be completed.</summary>
        internal readonly Action _completionAction;
 
        /// <summary>The number of items remaining to form a batch.</summary>
        internal int _remainingItemsInBatch;
        /// <summary>The number of targets still alive (i.e. not declining all further messages).</summary>
        internal int _remainingAliveTargets;
        /// <summary>Whether all targets should decline all further messages.</summary>
        internal bool _decliningPermanently;
        /// <summary>The number of batches created.</summary>
        internal long _batchesCreated;
    }
}