|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchedJoinBlock.cs
//
//
// A propagator block that groups individual messages of multiple types
// into tuples of arrays of those messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>
/// Provides a dataflow block that batches a specified number of inputs of potentially differing types
/// provided to one or more of its targets.
/// </summary>
/// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
/// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
{
/// <summary>The size of the batches generated by this BatchedJoin.</summary>
private readonly int _batchSize;
/// <summary>State shared among the targets.</summary>
private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
/// <summary>The target providing inputs of type T1.</summary>
private readonly BatchedJoinBlockTarget<T1> _target1;
/// <summary>The target providing inputs of type T2.</summary>
private readonly BatchedJoinBlockTarget<T2> _target2;
/// <summary>The source side.</summary>
private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;
/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
public BatchedJoinBlock(int batchSize) :
this(batchSize, GroupingDataflowBlockOptions.Default)
{ }
/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
{
// Validate arguments
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive);
if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
if (!dataflowBlockOptions.Greedy) throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions));
if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded) throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, nameof(dataflowBlockOptions));
// Store arguments
_batchSize = batchSize;
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Configure the source
_source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
// The action to run when a batch should be created. This is typically called
// when we have a full batch, but it will also be called when we're done receiving
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1!.Count > 0 || _target2!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages()));
}
};
// Configure the targets
_sharedResources = new BatchedJoinBlockTargetSharedResources(
batchSize, dataflowBlockOptions,
createBatchAction,
() =>
{
createBatchAction();
_source.Complete();
},
_source.AddException,
Complete);
_target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
_target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchedJoinBlock<T1, T2>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception!);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
}
/// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
public int BatchSize { get { return _batchSize; } }
/// <summary>Gets a target that may be used to offer messages of the first type.</summary>
public ITargetBlock<T1> Target1 { get { return _target1; } }
/// <summary>Gets a target that may be used to offer messages of the second type.</summary>
public ITargetBlock<T2> Target2 { get { return _target2; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
{
return _source.LinkTo(target, linkOptions);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public bool TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>>? filter, [NotNullWhen(true)] out Tuple<IList<T1>, IList<T2>>? item)
{
return _source.TryReceive(filter, out item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public bool TryReceiveAll([NotNullWhen(true)] out IList<Tuple<IList<T1>, IList<T2>>>? items) { return _source.TryReceiveAll(out items); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
public int OutputCount { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete()
{
Debug.Assert(_target1 != null, "_target1 not initialized");
Debug.Assert(_target2 != null, "_target2 not initialized");
_target1.Complete();
_target2.Complete();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
Debug.Assert(_source != null, "_source not initialized");
lock (_sharedResources._incomingLock)
{
if (!_sharedResources._decliningPermanently) _source.AddException(exception);
}
Complete();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
Tuple<IList<T1>, IList<T2>>? ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out bool messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>
/// Invokes Complete on each target
/// </summary>
private void CompleteEachTarget()
{
_target1.Complete();
_target2.Complete();
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize = {BatchSize}, OutputCount = {OutputCountForDebugger}";
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Transform.</summary>
private sealed class DebugView
{
/// <summary>The block being viewed.</summary>
private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;
/// <summary>The source half of the block being viewed.</summary>
private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="batchedJoinBlock">The batched join being viewed.</param>
public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
{
Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
_batchedJoinBlock = batchedJoinBlock;
_sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the messages waiting to be received.</summary>
public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>Gets the number of batches created.</summary>
public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
/// <summary>Gets the number of items remaining to form a batch.</summary>
public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
/// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
public int BatchSize { get { return _batchedJoinBlock._batchSize; } }
/// <summary>Gets the first target.</summary>
public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
/// <summary>Gets the second target.</summary>
public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
public ITargetBlock<Tuple<IList<T1>, IList<T2>>>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
/// <summary>
/// Provides a dataflow block that batches a specified number of inputs of potentially differing types
/// provided to one or more of its targets.
/// </summary>
/// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
/// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
/// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
{
/// <summary>The size of the batches generated by this BatchedJoin.</summary>
private readonly int _batchSize;
/// <summary>State shared among the targets.</summary>
private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
/// <summary>The target providing inputs of type T1.</summary>
private readonly BatchedJoinBlockTarget<T1> _target1;
/// <summary>The target providing inputs of type T2.</summary>
private readonly BatchedJoinBlockTarget<T2> _target2;
/// <summary>The target providing inputs of type T3.</summary>
private readonly BatchedJoinBlockTarget<T3> _target3;
/// <summary>The source side.</summary>
private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;
/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
public BatchedJoinBlock(int batchSize) :
this(batchSize, GroupingDataflowBlockOptions.Default)
{ }
/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
{
// Validate arguments
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive);
if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
if (!dataflowBlockOptions.Greedy ||
dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
{
throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions));
}
// Store arguments
_batchSize = batchSize;
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Configure the source
_source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
// The action to run when a batch should be created. This is typically called
// when we have a full batch, but it will also be called when we're done receiving
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1!.Count > 0 || _target2!.Count > 0 || _target3!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages(), _target3!.GetAndEmptyMessages()));
}
};
// Configure the targets
_sharedResources = new BatchedJoinBlockTargetSharedResources(
batchSize, dataflowBlockOptions,
createBatchAction,
() =>
{
createBatchAction();
_source.Complete();
},
_source.AddException,
Complete);
_target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
_target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
_target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception!);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
}
/// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
public int BatchSize { get { return _batchSize; } }
/// <summary>Gets a target that may be used to offer messages of the first type.</summary>
public ITargetBlock<T1> Target1 { get { return _target1; } }
/// <summary>Gets a target that may be used to offer messages of the second type.</summary>
public ITargetBlock<T2> Target2 { get { return _target2; } }
/// <summary>Gets a target that may be used to offer messages of the third type.</summary>
public ITargetBlock<T3> Target3 { get { return _target3; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
{
return _source.LinkTo(target, linkOptions);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public bool TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>>? filter, [NotNullWhen(true)] out Tuple<IList<T1>, IList<T2>, IList<T3>>? item)
{
return _source.TryReceive(filter, out item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public bool TryReceiveAll([NotNullWhen(true)] out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>>? items) { return _source.TryReceiveAll(out items); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
public int OutputCount { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete()
{
Debug.Assert(_target1 != null, "_target1 not initialized");
Debug.Assert(_target2 != null, "_target2 not initialized");
Debug.Assert(_target3 != null, "_target3 not initialized");
_target1.Complete();
_target2.Complete();
_target3.Complete();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
Debug.Assert(_source != null, "_source not initialized");
lock (_sharedResources._incomingLock)
{
if (!_sharedResources._decliningPermanently) _source.AddException(exception);
}
Complete();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
Tuple<IList<T1>, IList<T2>, IList<T3>>? ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out bool messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>
/// Invokes Complete on each target
/// </summary>
private void CompleteEachTarget()
{
_target1.Complete();
_target2.Complete();
_target3.Complete();
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize = {BatchSize}, OutputCount = {OutputCountForDebugger}";
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Transform.</summary>
private sealed class DebugView
{
/// <summary>The block being viewed.</summary>
private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;
/// <summary>The source half of the block being viewed.</summary>
private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="batchedJoinBlock">The batched join being viewed.</param>
public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
{
Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
_sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
_batchedJoinBlock = batchedJoinBlock;
}
/// <summary>Gets the messages waiting to be received.</summary>
public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>Gets the number of batches created.</summary>
public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
/// <summary>Gets the number of items remaining to form a batch.</summary>
public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }
/// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
public int BatchSize { get { return _batchedJoinBlock._batchSize; } }
/// <summary>Gets the first target.</summary>
public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
/// <summary>Gets the second target.</summary>
public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
/// <summary>Gets the second target.</summary>
public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
}
namespace System.Threading.Tasks.Dataflow.Internal
{
/// <summary>Provides the target used in a BatchedJoin.</summary>
/// <typeparam name="T">Specifies the type of data accepted by this target.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BatchedJoinBlockTarget<>.DebugView))]
internal sealed class BatchedJoinBlockTarget<T> : ITargetBlock<T>, IDebuggerDisplay
{
/// <summary>The shared resources used by all targets associated with the same batched join instance.</summary>
private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
/// <summary>Whether this target is declining future messages.</summary>
private bool _decliningPermanently;
/// <summary>Input messages for the next batch.</summary>
private List<T> _messages = new List<T>();
/// <summary>Initializes the target.</summary>
/// <param name="sharedResources">The shared resources used by all targets associated with this batched join.</param>
internal BatchedJoinBlockTarget(BatchedJoinBlockTargetSharedResources sharedResources)
{
Debug.Assert(sharedResources != null, "Targets require a shared resources through which to communicate.");
// Store the shared resources, and register with it to let it know there's
// another target. This is done in a non-thread-safe manner and must be done
// during construction of the batched join instance.
_sharedResources = sharedResources;
sharedResources._remainingAliveTargets++;
}
/// <summary>Gets the number of messages buffered in this target.</summary>
internal int Count { get { return _messages.Count; } }
/// <summary>Gets the messages buffered by this target and then empties the collection.</summary>
/// <returns>The messages from the target.</returns>
internal IList<T> GetAndEmptyMessages()
{
Common.ContractAssertMonitorStatus(_sharedResources._incomingLock, held: true);
List<T> toReturn = _messages;
_messages = new List<T>();
return toReturn;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
lock (_sharedResources._incomingLock)
{
// If we've already stopped accepting messages, decline permanently
if (_decliningPermanently ||
_sharedResources._decliningPermanently)
return DataflowMessageStatus.DecliningPermanently;
// Consume the message from the source if necessary, and store the message
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
_messages.Add(messageValue!);
// If this message makes a batch, notify the shared resources that a batch has been completed
if (--_sharedResources._remainingItemsInBatch == 0) _sharedResources._batchSizeReachedAction();
return DataflowMessageStatus.Accepted;
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete()
{
lock (_sharedResources._incomingLock)
{
// If this is the first time Complete is being called,
// note that there's now one fewer targets receiving messages for the batched join.
if (!_decliningPermanently)
{
_decliningPermanently = true;
if (--_sharedResources._remainingAliveTargets == 0) _sharedResources._allTargetsDecliningPermanentlyAction();
}
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
lock (_sharedResources._incomingLock)
{
if (!_decliningPermanently && !_sharedResources._decliningPermanently) _sharedResources._exceptionAction(exception);
}
_sharedResources._completionAction();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this)} InputCount = {_messages.Count}";
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Transform.</summary>
private sealed class DebugView
{
/// <summary>The batched join block target being viewed.</summary>
private readonly BatchedJoinBlockTarget<T> _batchedJoinBlockTarget;
/// <summary>Initializes the debug view.</summary>
/// <param name="batchedJoinBlockTarget">The batched join target being viewed.</param>
public DebugView(BatchedJoinBlockTarget<T> batchedJoinBlockTarget)
{
Debug.Assert(batchedJoinBlockTarget != null, "Need a block with which to construct the debug view.");
_batchedJoinBlockTarget = batchedJoinBlockTarget;
}
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<T> InputQueue { get { return _batchedJoinBlockTarget._messages; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently
{
get
{
return _batchedJoinBlockTarget._decliningPermanently ||
_batchedJoinBlockTarget._sharedResources._decliningPermanently;
}
}
}
}
/// <summary>Provides a container for resources shared across all targets used by the same BatchedJoinBlock instance.</summary>
internal sealed class BatchedJoinBlockTargetSharedResources
{
/// <summary>Initializes the shared resources.</summary>
/// <param name="batchSize">The size of a batch to create.</param>
/// <param name="dataflowBlockOptions">The options used to configure the shared resources. Assumed to be immutable.</param>
/// <param name="batchSizeReachedAction">The action to invoke when a batch is completed.</param>
/// <param name="allTargetsDecliningAction">The action to invoke when no more targets are accepting input.</param>
/// <param name="exceptionAction">The action to invoke when an exception needs to be logged.</param>
/// <param name="completionAction">The action to invoke when completing, typically invoked due to a call to Fault.</param>
internal BatchedJoinBlockTargetSharedResources(
int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions,
Action batchSizeReachedAction, Action allTargetsDecliningAction,
Action<Exception> exceptionAction, Action completionAction)
{
Debug.Assert(batchSize >= 1, "A positive batch size is required.");
Debug.Assert(batchSizeReachedAction != null, "Need an action to invoke for each batch.");
Debug.Assert(allTargetsDecliningAction != null, "Need an action to invoke when all targets have declined.");
_incomingLock = new object();
_batchSize = batchSize;
// _remainingAliveTargets will be incremented when targets are added.
// They must be added during construction of the BatchedJoin<...>.
_remainingAliveTargets = 0;
_remainingItemsInBatch = batchSize;
// Configure what to do when batches are completed and/or all targets start declining
_allTargetsDecliningPermanentlyAction = () =>
{
// Invoke the caller's action
allTargetsDecliningAction();
// Don't accept any more messages. We should already
// be doing this anyway through each individual target's declining flag,
// so setting it to true is just a precaution and is also helpful
// when onceOnly is true.
_decliningPermanently = true;
};
_batchSizeReachedAction = () =>
{
// Invoke the caller's action
batchSizeReachedAction();
_batchesCreated++;
// If this batched join is meant to be used for only a single
// batch, invoke the completion logic.
if (_batchesCreated >= dataflowBlockOptions.ActualMaxNumberOfGroups) _allTargetsDecliningPermanentlyAction();
// Otherwise, get ready for the next batch.
else _remainingItemsInBatch = _batchSize;
};
_exceptionAction = exceptionAction;
_completionAction = completionAction;
}
/// <summary>
/// A lock used to synchronize all incoming messages on all targets. It protects all of the rest
/// of the shared Resources's state and will be held while invoking the delegates.
/// </summary>
internal readonly object _incomingLock;
/// <summary>The size of the batches to generate.</summary>
internal readonly int _batchSize;
/// <summary>The action to invoke when enough elements have been accumulated to make a batch.</summary>
internal readonly Action _batchSizeReachedAction;
/// <summary>The action to invoke when all targets are declining further messages.</summary>
internal readonly Action _allTargetsDecliningPermanentlyAction;
/// <summary>The action to invoke when an exception has to be logged.</summary>
internal readonly Action<Exception> _exceptionAction;
/// <summary>The action to invoke when the owning block has to be completed.</summary>
internal readonly Action _completionAction;
/// <summary>The number of items remaining to form a batch.</summary>
internal int _remainingItemsInBatch;
/// <summary>The number of targets still alive (i.e. not declining all further messages).</summary>
internal int _remainingAliveTargets;
/// <summary>Whether all targets should decline all further messages.</summary>
internal bool _decliningPermanently;
/// <summary>The number of batches created.</summary>
internal long _batchesCreated;
}
}
|