|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchBlock.cs
//
//
// A propagator block that groups individual messages into arrays of messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Security;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a dataflow block that batches inputs into arrays.</summary>
/// <typeparam name="T">Specifies the type of data put into batches.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BatchBlock<>.DebugView))]
public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]>, IDebuggerDisplay
{
/// <summary>The target half of this batch.</summary>
private readonly BatchBlockTargetCore _target;
/// <summary>The source half of this batch.</summary>
private readonly SourceCore<T[]> _source;
/// <summary>Initializes this <see cref="BatchBlock{T}"/> with the specified batch size.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
public BatchBlock(int batchSize) :
this(batchSize, GroupingDataflowBlockOptions.Default)
{ }
/// <summary>Initializes this <see cref="BatchBlock{T}"/> with the specified batch size, declining option, and block options.</summary>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be no greater than the value of the BoundedCapacity option if a non-default value has been set.</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BatchBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
{
// Validate arguments
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive);
if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
if (dataflowBlockOptions.BoundedCapacity > 0 && dataflowBlockOptions.BoundedCapacity < batchSize) throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity);
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Initialize bounding actions
Action<ISourceBlock<T[]>, int>? onItemsRemoved = null;
Func<ISourceBlock<T[]>, T[], IList<T[]>?, int>? itemCountingFunc = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
onItemsRemoved = static (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
itemCountingFunc = static (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
}
// Initialize source
_source = new SourceCore<T[]>(this, dataflowBlockOptions,
static owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
onItemsRemoved, itemCountingFunc);
// Initialize target
_target = new BatchBlockTargetCore(this, batchSize, _source.AddMessage, dataflowBlockOptions);
// When the target is done, let the source know it won't be getting any more data
_target.Completion.ContinueWith(delegate { _source.Complete(); },
CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchBlock<T>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception!);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchBlockTargetCore)state!).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete() { _target.Complete(exception: null, dropPendingMessages: false, releaseReservedMessages: false); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
_target.Complete(exception, dropPendingMessages: true, releaseReservedMessages: false);
}
/// <summary>
/// Triggers the <see cref="BatchBlock{T}"/> to initiate a batching operation even if the number
/// of currently queued or postponed items is less than the <see cref="BatchSize"/>.
/// </summary>
/// <remarks>
/// In greedy mode, a batch will be generated from queued items even if fewer exist than the batch size.
/// In non-greedy mode, a batch will be generated asynchronously from postponed items even if
/// fewer than the batch size can be consumed.
/// </remarks>
public void TriggerBatch() { _target.TriggerBatch(); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
{
return _source.LinkTo(target, linkOptions);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public bool TryReceive(Predicate<T[]>? filter, [NotNullWhen(true)] out T[]? item)
{
return _source.TryReceive(filter, out item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public bool TryReceiveAll([NotNullWhen(true)] out IList<T[]>? items) { return _source.TryReceiveAll(out items); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
public int OutputCount { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary>
/// <remarks>
/// If the number of items provided to the block is not evenly divisible by the batch size provided
/// to the block's constructor, the block's final batch may contain fewer than the requested number of items.
/// </remarks>
public int BatchSize { get { return _target.BatchSize; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
T[]? ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>Gets the number of messages waiting to be offered. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize = {BatchSize}, OutputCount = {OutputCountForDebugger}";
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Batch.</summary>
private sealed class DebugView
{
/// <summary>The batch block being viewed.</summary>
private readonly BatchBlock<T> _batchBlock;
/// <summary>The target half being viewed.</summary>
private readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation;
/// <summary>The source half of the block being viewed.</summary>
private readonly SourceCore<T[]>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="batchBlock">The batch being viewed.</param>
public DebugView(BatchBlock<T> batchBlock)
{
Debug.Assert(batchBlock != null, "Need a block with which to construct the debug view");
_batchBlock = batchBlock;
_targetDebuggingInformation = batchBlock._target.GetDebuggingInformation();
_sourceDebuggingInformation = batchBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<T> InputQueue { get { return _targetDebuggingInformation.InputQueue; } }
/// <summary>Gets the messages waiting to be received.</summary>
public IEnumerable<T[]> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>Gets the number of batches that have been completed.</summary>
public long BatchesCompleted { get { return _targetDebuggingInformation.NumberOfBatchesCompleted; } }
/// <summary>Gets the task being used for input processing.</summary>
public Task? TaskForInputProcessing { get { return _targetDebuggingInformation.TaskForInputProcessing; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets the size of batches generated by the block.</summary>
public int BatchSize { get { return _batchBlock.BatchSize; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_batchBlock); } }
/// <summary>Gets the messages postponed by this batch.</summary>
public QueuedMap<ISourceBlock<T>, DataflowMessageHeader>? PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<T[]> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public ITargetBlock<T[]>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
/// <summary>Provides the core target implementation for a Batch.</summary>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
private sealed class BatchBlockTargetCore
{
/// <summary>The messages in this target.</summary>
private readonly Queue<T> _messages = new Queue<T>();
/// <summary>A task representing the completion of the block.</summary>
private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();
/// <summary>Gets the object used as the incoming lock.</summary>
private object IncomingLock { get { return _completionTask; } }
/// <summary>The target that owns this target core.</summary>
private readonly BatchBlock<T> _owningBatch;
/// <summary>The batch size.</summary>
private readonly int _batchSize;
/// <summary>State used when in non-greedy mode.</summary>
private readonly NonGreedyState? _nonGreedyState;
/// <summary>Bounding state for when the block is executing in bounded mode.</summary>
private readonly BoundingState? _boundingState;
/// <summary>The options associated with this block.</summary>
private readonly GroupingDataflowBlockOptions _dataflowBlockOptions;
/// <summary>The action invoked with a completed batch.</summary>
private readonly Action<T[]> _batchCompletedAction;
/// <summary>Whether to stop accepting new messages.</summary>
private bool _decliningPermanently;
/// <summary>Whether we've completed at least one batch.</summary>
private long _batchesCompleted;
/// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
private bool _completionReserved;
/// <summary>State used only when in non-greedy mode.</summary>
private sealed class NonGreedyState
{
/// <summary>Collection of postponed messages.</summary>
internal readonly QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages;
/// <summary>A temporary array used to store data retrieved from PostponedMessages.</summary>
internal readonly KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] PostponedMessagesTemp;
/// <summary>A temporary list used in non-greedy mode when consuming postponed messages to store successfully reserved messages.</summary>
internal readonly List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> ReservedSourcesTemp;
/// <summary>Whether the next batching operation should accept fewer than BatchSize items.</summary>
/// <remarks>This value may be read not under a lock, but it must only be written to protected by the IncomingLock.</remarks>
internal bool AcceptFewerThanBatchSize;
/// <summary>The task used to process messages.</summary>
internal Task? TaskForInputProcessing;
/// <summary>Initializes the NonGreedyState.</summary>
/// <param name="batchSize">The batch size used by the BatchBlock.</param>
internal NonGreedyState(int batchSize)
{
// A non-greedy batch requires at least batchSize sources to be successful.
// Thus, we initialize our collections to be able to store at least that many elements
// in order to avoid unnecessary allocations below that point.
Debug.Assert(batchSize > 0, "A positive batch size is required");
PostponedMessages = new QueuedMap<ISourceBlock<T>, DataflowMessageHeader>(batchSize);
PostponedMessagesTemp = new KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[batchSize];
ReservedSourcesTemp = new List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>>(batchSize);
}
}
/// <summary>Initializes this target core with the specified configuration.</summary>
/// <param name="owningBatch">The owning batch target.</param>
/// <param name="batchSize">The number of items to group into a batch.</param>
/// <param name="batchCompletedAction">The delegate to invoke when a batch is completed.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchBlock{T}"/>. Assumed to be immutable.</param>
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
internal BatchBlockTargetCore(BatchBlock<T> owningBatch, int batchSize, Action<T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)
{
Debug.Assert(owningBatch != null, "This batch target core must be associated with a batch block.");
Debug.Assert(batchSize >= 1, "Batch sizes must be positive.");
Debug.Assert(batchCompletedAction != null, "Completion action must be specified.");
Debug.Assert(dataflowBlockOptions != null, "Options required to configure the block.");
// Store arguments
_owningBatch = owningBatch;
_batchSize = batchSize;
_batchCompletedAction = batchCompletedAction;
_dataflowBlockOptions = dataflowBlockOptions;
// We'll be using _nonGreedyState even if we are greedy with bounding
bool boundingEnabled = dataflowBlockOptions.BoundedCapacity > 0;
if (!_dataflowBlockOptions.Greedy || boundingEnabled) _nonGreedyState = new NonGreedyState(batchSize);
if (boundingEnabled) _boundingState = new BoundingState(dataflowBlockOptions.BoundedCapacity);
}
/// <summary>
/// Triggers a batching operation even if the number of currently queued or postponed items is less than the <see cref="BatchSize"/>.
/// </summary>
internal void TriggerBatch()
{
lock (IncomingLock)
{
// If we shouldn't be doing any more work, bail. Otherwise, note that we're willing to
// accept fewer items in the next batching operation, and ensure processing is kicked off.
if (!_decliningPermanently && !_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
if (_nonGreedyState == null)
{
MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
}
else
{
_nonGreedyState.AcceptFewerThanBatchSize = true;
ProcessAsyncIfNecessary();
}
}
CompleteBlockIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
lock (IncomingLock)
{
// If we shouldn't be accepting more messages, don't.
if (_decliningPermanently)
{
CompleteBlockIfPossible();
return DataflowMessageStatus.DecliningPermanently;
}
// We can directly accept the message if:
// 1) we are being greedy AND we are not bounding, OR
// 2) we are being greedy AND we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
// (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
// (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
// accepting new ones directly into the queue.)
if (_dataflowBlockOptions.Greedy &&
(_boundingState == null
||
(_boundingState.CountIsLessThanBound && _nonGreedyState!.PostponedMessages.Count == 0 && _nonGreedyState.TaskForInputProcessing == null)))
{
// Consume the message from the source if necessary
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// Once consumed, enqueue it.
_messages.Enqueue(messageValue!);
if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
// Now start declining if the number of batches we've already made plus
// the number we can make from data already enqueued meets our quota.
if (!_decliningPermanently &&
(_batchesCompleted + (_messages.Count / _batchSize)) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
{
_decliningPermanently = true;
}
// Now that we have a message, see if we can make forward progress.
MakeBatchIfPossible(evenIfFewerThanBatchSize: false);
CompleteBlockIfPossible();
return DataflowMessageStatus.Accepted;
}
// Otherwise, we try to postpone if a source was provided
else if (source != null)
{
Debug.Assert(_nonGreedyState != null, "_nonGreedyState must have been initialized during construction in non-greedy mode.");
// We always postpone using _nonGreedyState even if we are being greedy with bounding
_nonGreedyState.PostponedMessages.Push(source, messageHeader);
// In non-greedy mode, we need to see if batch could be completed
if (!_dataflowBlockOptions.Greedy) ProcessAsyncIfNecessary();
return DataflowMessageStatus.Postponed;
}
// We can't do anything else about this message
return DataflowMessageStatus.Declined;
}
}
/// <summary>Completes/faults the block.
/// In general, it is not safe to pass releaseReservedMessages:true, because releasing of reserved messages
/// is done without taking a lock. We pass releaseReservedMessages:true only when an exception has been
/// caught inside the message processing loop which is a single instance at any given moment.</summary>
internal void Complete(Exception? exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false)
{
// Ensure that no new messages may be added
lock (IncomingLock)
{
// Faulting from outside is allowed until we start declining permanently.
// Faulting from inside is allowed at any time.
if (exception != null && (!_decliningPermanently || releaseReservedMessages))
{
// Record the exception in the source.
// The source, which exposes its Completion to the public will take this
// into account and will complete in Faulted state.
_owningBatch._source.AddException(exception);
}
// Drop pending messages if requested
if (dropPendingMessages) _messages.Clear();
}
// Release reserved messages if requested.
// This must be done from outside the lock.
if (releaseReservedMessages)
{
try { ReleaseReservedMessages(throwOnFirstException: false); }
catch (Exception e) { _owningBatch._source.AddException(e); }
}
// Triggering completion requires the lock
lock (IncomingLock)
{
// Revert the dirty processing state if requested
if (revertProcessingState)
{
Debug.Assert(_nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null,
"The processing state must be dirty when revertProcessingState==true.");
_nonGreedyState.TaskForInputProcessing = null;
}
// Trigger completion
_decliningPermanently = true;
CompleteBlockIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
internal Task Completion { get { return _completionTask.Task; } }
/// <summary>Gets the size of the batches generated by this <see cref="BatchBlock{T}"/>.</summary>
internal int BatchSize { get { return _batchSize; } }
/// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary>
private bool CanceledOrFaulted
{
get
{
return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || _owningBatch._source.HasExceptions;
}
}
/// <summary>Returns the available capacity to bring in postponed items. The exact values above _batchSize don't matter.</summary>
private int BoundedCapacityAvailable
{
get
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
return _boundingState != null ?
_dataflowBlockOptions.BoundedCapacity - _boundingState.CurrentCount :
_batchSize;
}
}
/// <summary>Completes the block once all completion conditions are met.</summary>
private void CompleteBlockIfPossible()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (!_completionReserved)
{
bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
bool noMoreMessages = _decliningPermanently && _messages.Count < _batchSize;
bool complete = !currentlyProcessing && (completedAllDesiredBatches || noMoreMessages || CanceledOrFaulted);
if (complete)
{
_completionReserved = true;
// Make sure the target is declining
_decliningPermanently = true;
// If we still have straggling items remaining, make them into their own batch even though there are fewer than batchSize
if (_messages.Count > 0) MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
// We need to complete the block, but we may have arrived here from an external
// call to the block. To avoid running arbitrary code in the form of
// completion task continuations in that case, do it in a separate task.
Task.Factory.StartNew(thisTargetCore =>
{
var targetCore = (BatchBlockTargetCore)thisTargetCore!;
// Release any postponed messages
List<Exception>? exceptions = null;
if (targetCore._nonGreedyState != null)
{
// Note: No locks should be held at this point
Common.ReleaseAllPostponedMessages(targetCore._owningBatch,
targetCore._nonGreedyState.PostponedMessages,
ref exceptions);
}
if (exceptions != null)
{
// It is important to migrate these exceptions to the source part of the owning batch,
// because that is the completion task that is publicly exposed.
targetCore._owningBatch._source.AddExceptions(exceptions);
}
// Target's completion task is only available internally with the sole purpose
// of releasing the task that completes the parent. Hence the actual reason
// for completing this task doesn't matter.
targetCore._completionTask.TrySetResult(default(VoidResult));
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>
/// Gets whether we should launch further synchronous or asynchronous processing
/// to create batches.
/// </summary>
private bool BatchesNeedProcessing
{
get
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
// If we're currently processing asynchronously, let that async task
// handle all work; nothing more to do here. If we're not currently processing
// but cancellation has been requested, don't do more work either.
bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
if (completedAllDesiredBatches || currentlyProcessing || CanceledOrFaulted) return false;
// Now, if it's possible to create a batch from queued items or if there are enough
// postponed items to attempt a batch, batches need processing.
int neededMessageCountToCompleteBatch = _batchSize - _messages.Count;
int boundedCapacityAvailable = BoundedCapacityAvailable;
// We have items queued up sufficient to make up a batch
if (neededMessageCountToCompleteBatch <= 0) return true;
if (_nonGreedyState != null)
{
// If a batch was triggered and we have any messages, we can create a batch from what we already have.
if (_nonGreedyState.AcceptFewerThanBatchSize && _messages.Count > 0)
return true;
// At this point, to make a batch we'll need to consume postponed messages, but we can't do
// that if we're declining all future messages.
if (_decliningPermanently)
return false;
// If a batch was triggered and there are any postponed messages to retrieve and there's room available, try.
if (_nonGreedyState.AcceptFewerThanBatchSize && _nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0)
return true;
if (_dataflowBlockOptions.Greedy)
{
// We are in greedy mode and we have postponed messages.
// (In greedy mode we only postpone due to lack of bounding capacity.)
// And now we have capacity to consume some postponed messages.
// (In greedy mode we can/should consume as many postponed messages as we can even
// if those messages are insufficient to make up a batch.)
if (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0) return true;
}
else
{
// We are in non-greedy mode and we have enough postponed messages and bounding capacity to make a full batch
if (_nonGreedyState.PostponedMessages.Count >= neededMessageCountToCompleteBatch &&
boundedCapacityAvailable >= neededMessageCountToCompleteBatch)
return true;
}
}
// There is no other reason to kick off a processing task
return false;
}
}
/// <summary>Called when new messages are available to be processed.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
private void ProcessAsyncIfNecessary(bool isReplacementReplica = false)
{
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (BatchesNeedProcessing)
{
ProcessAsyncIfNecessary_Slow(isReplacementReplica);
}
}
/// <summary>
/// Slow path for ProcessAsyncIfNecessary.
/// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
/// </summary>
private void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)
{
Debug.Assert(BatchesNeedProcessing, "There must be a batch that needs processing.");
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_nonGreedyState!.TaskForInputProcessing = new Task(static thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget!).ProcessMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
_owningBatch, _nonGreedyState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
_messages.Count + _nonGreedyState.PostponedMessages.Count);
}
// Start the task handling scheduling exceptions
Exception? exception = Common.StartTaskSafe(_nonGreedyState.TaskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// Get out from under currently held locks. Complete re-acquires the locks it needs.
Task.Factory.StartNew(exc => Complete(exception: (Exception)exc!, dropPendingMessages: true, releaseReservedMessages: true, revertProcessingState: true),
exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
/// <summary>Task body used to process messages.</summary>
private void ProcessMessagesLoopCore()
{
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
try
{
int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
int timesThroughLoop = 0;
bool madeProgress;
do
{
// Determine whether a batch has been forced/triggered.
// (If the value is read as false and is set to true immediately afterwards,
// we'll simply force the next time around. The only code that can
// set the value to false is this function, after reading a true value.)
bool triggered = Volatile.Read(ref _nonGreedyState.AcceptFewerThanBatchSize);
// Retrieve postponed items:
// In non-greedy mode: Reserve + Consume
// In greedy bounded mode: Consume (without a prior reservation)
if (!_dataflowBlockOptions.Greedy) RetrievePostponedItemsNonGreedy(allowFewerThanBatchSize: triggered);
else RetrievePostponedItemsGreedyBounded(allowFewerThanBatchSize: triggered);
// Try to make a batch if there are enough buffered messages
lock (IncomingLock)
{
madeProgress = MakeBatchIfPossible(evenIfFewerThanBatchSize: triggered);
// Reset the trigger flag if:
// - We made a batch, regardless of whether it came due to a trigger or not.
// - We tried to make a batch due to a trigger, but were unable to, which
// could happen if we're unable to consume any of the postponed messages.
if (madeProgress || triggered) _nonGreedyState.AcceptFewerThanBatchSize = false;
}
timesThroughLoop++;
} while (madeProgress && timesThroughLoop < maxMessagesPerTask);
}
catch (Exception exc)
{
Complete(exc, dropPendingMessages: false, releaseReservedMessages: true);
}
finally
{
lock (IncomingLock)
{
// We're no longer processing, so null out the processing task
_nonGreedyState.TaskForInputProcessing = null;
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ProcessAsyncIfNecessary(isReplacementReplica: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteBlockIfPossible();
}
}
}
/// <summary>Create a batch from the available items.</summary>
/// <param name="evenIfFewerThanBatchSize">
/// Whether to make a batch even if there are fewer than BatchSize items available.
/// </param>
/// <returns>true if a batch was created and published; otherwise, false.</returns>
private bool MakeBatchIfPossible(bool evenIfFewerThanBatchSize)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
// Is a full batch available?
bool fullBatch = _messages.Count >= _batchSize;
// If so, or if it's ok to make a batch with fewer than batchSize, make one.
if (fullBatch || (evenIfFewerThanBatchSize && _messages.Count > 0))
{
var newBatch = new T[fullBatch ? _batchSize : _messages.Count];
for (int i = 0; i < newBatch.Length; i++) newBatch[i] = _messages.Dequeue();
_batchCompletedAction(newBatch);
_batchesCompleted++;
if (_batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups) _decliningPermanently = true;
return true;
}
// No batch could be created
else return false;
}
/// <summary>Retrieves postponed items in non-greedy mode if we have enough to make a batch.</summary>
/// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks>
private void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)
{
Debug.Assert(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Shortcuts just to keep the code cleaner
QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
// Clear the temporary buffer. This is safe to do without a lock because
// it is only accessed by the serial message loop.
reserved.Clear();
int poppedInitially;
int boundedCapacityAvailable;
lock (IncomingLock)
{
// The queue must be empty between batches in non-greedy mode
Debug.Assert(_messages.Count == 0, "The queue must be empty between batches in non-greedy mode");
// If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
boundedCapacityAvailable = BoundedCapacityAvailable;
if (_decliningPermanently ||
postponed.Count == 0 ||
boundedCapacityAvailable <= 0 ||
(!allowFewerThanBatchSize && (postponed.Count < _batchSize || boundedCapacityAvailable < _batchSize)))
return;
// Grab an initial batch of postponed messages.
poppedInitially = postponed.PopRange(postponedTemp, 0, _batchSize);
Debug.Assert(allowFewerThanBatchSize ? poppedInitially > 0 : poppedInitially == _batchSize,
"We received fewer than we expected based on the previous check.");
} // Release the lock. We must not hold it while calling Reserve/Consume/Release.
// Try to reserve the initial batch of messages.
for (int i = 0; i < poppedInitially; i++)
{
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
{
var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)!);
var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
reserved.Add(reservedSourceAndMessage);
}
}
Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
// If we didn't reserve enough to make a batch, start picking off postponed messages
// one by one until we either have enough reserved or we run out of messages
while (reserved.Count < _batchSize)
{
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
lock (IncomingLock)
{
if (!postponed.TryPop(out sourceAndMessage)) break;
} // Release the lock. We must not hold it while calling Reserve/Consume/Release.
if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
{
var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)!);
var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
reserved.Add(reservedSourceAndMessage);
}
}
Debug.Assert(reserved.Count <= _batchSize, "Expected the number of reserved sources to be <= the number needed for a batch.");
// We've now reserved what we can. Either consume them all or release them all.
if (reserved.Count > 0)
{
// TriggerBatch adds a complication here. It's possible that while we've been reserving
// messages, Post has been used to queue up a bunch of messages to the batch,
// and that if the batch has a max group count and enough messages were posted,
// we could now be declining. In that case, if we don't specially handle the situation,
// we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
// implies the block will only ever output a maximum number of batches. To handle this,
// we start declining before consuming, now that we know we'll have enough to form a batch.
// (If an exception occurs after we do this, we'll be shutting down the block anyway.)
// This is also why we still reserve/consume rather than just consume in forced mode,
// so that we only consume if we're able to turn what we consume into a batch.
bool shouldProceedToConsume = true;
if (allowFewerThanBatchSize)
{
lock (IncomingLock)
{
if (!_decliningPermanently &&
(_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
// Note that this logic differs from the other location where we do a similar check.
// Here we want to know whether we're one shy of meeting our quota, because we'll accept
// any size batch. Elsewhere, we need to know whether we have the right number of messages
// queued up.
{
shouldProceedToConsume = !_decliningPermanently;
_decliningPermanently = true;
}
}
}
if (shouldProceedToConsume && (allowFewerThanBatchSize || reserved.Count == _batchSize))
{
ConsumeReservedMessagesNonGreedy();
}
else
{
ReleaseReservedMessages(throwOnFirstException: true);
}
}
// Clear out the reserved list, so as not to hold onto values longer than necessary.
// We don't do this in case of failure, because the higher-level exception handler
// accesses the list to try to release reservations.
reserved.Clear();
}
/// <summary>Retrieves postponed items in greedy bounded mode.</summary>
/// <remarks>Whether we'll accept consuming fewer elements than the defined batch size.</remarks>
private void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)
{
Debug.Assert(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Debug.Assert(_boundingState != null, "Bounding state is required when in bounded mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Shortcuts just to keep the code cleaner
QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
// Clear the temporary buffer. This is safe to do without a lock because
// it is only accessed by the serial message loop.
reserved.Clear();
int poppedInitially;
int boundedCapacityAvailable;
int itemCountNeededToCompleteBatch;
lock (IncomingLock)
{
// If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
boundedCapacityAvailable = BoundedCapacityAvailable;
itemCountNeededToCompleteBatch = _batchSize - _messages.Count;
if (_decliningPermanently ||
postponed.Count == 0 ||
boundedCapacityAvailable <= 0)
return;
// Grab an initial batch of postponed messages.
if (boundedCapacityAvailable < itemCountNeededToCompleteBatch) itemCountNeededToCompleteBatch = boundedCapacityAvailable;
poppedInitially = postponed.PopRange(postponedTemp, 0, itemCountNeededToCompleteBatch);
Debug.Assert(poppedInitially > 0, "We received fewer than we expected based on the previous check.");
} // Release the lock. We must not hold it while calling Reserve/Consume/Release.
// Treat popped messages as reserved.
// We don't have to formally reserve because we are in greedy mode.
for (int i = 0; i < poppedInitially; i++)
{
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)!);
var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
reserved.Add(reservedSourceAndMessage);
}
Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
// If we didn't reserve enough to make a batch, start picking off postponed messages
// one by one until we either have enough reserved or we run out of messages
while (reserved.Count < itemCountNeededToCompleteBatch)
{
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
lock (IncomingLock)
{
if (!postponed.TryPop(out sourceAndMessage)) break;
} // Release the lock. We must not hold it while calling Reserve/Consume/Release.
var reservedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value, default(T)!);
var reservedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, reservedMessage);
reserved.Add(reservedSourceAndMessage);
}
Debug.Assert(reserved.Count <= itemCountNeededToCompleteBatch, "Expected the number of reserved sources to be <= the number needed for a batch.");
// We've gotten as many postponed messages as we can. Try to consume them.
if (reserved.Count > 0)
{
// TriggerBatch adds a complication here. It's possible that while we've been reserving
// messages, Post has been used to queue up a bunch of messages to the batch,
// and that if the batch has a max group count and enough messages were posted,
// we could now be declining. In that case, if we don't specially handle the situation,
// we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
// implies the block will only ever output a maximum number of batches. To handle this,
// we start declining before consuming, now that we know we'll have enough to form a batch.
// (If an exception occurs after we do this, we'll be shutting down the block anyway.)
// This is also why we still reserve/consume rather than just consume in forced mode,
// so that we only consume if we're able to turn what we consume into a batch.
bool shouldProceedToConsume = true;
if (allowFewerThanBatchSize)
{
lock (IncomingLock)
{
if (!_decliningPermanently &&
(_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
// Note that this logic differs from the other location where we do a similar check.
// Here we want to know whether we're one shy of meeting our quota, because we'll accept
// any size batch. Elsewhere, we need to know whether we have the right number of messages
// queued up.
{
shouldProceedToConsume = !_decliningPermanently;
_decliningPermanently = true;
}
}
}
if (shouldProceedToConsume)
{
ConsumeReservedMessagesGreedyBounded();
}
}
// Clear out the reserved list, so as not to hold onto values longer than necessary.
// We don't do this in case of failure, because the higher-level exception handler
// accesses the list to try to release reservations.
reserved.Clear();
}
/// <summary>
/// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
/// </summary>
private void ConsumeReservedMessagesNonGreedy()
{
Debug.Assert(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Consume the reserved items and store the data.
List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
for (int i = 0; i < reserved.Count; i++)
{
// We can only store the data into _messages while holding the IncomingLock, we
// don't want to allocate extra objects for each batch, and we don't want to
// take and release the lock for each individual item... but we do need to use
// the consumed message rather than the initial one. To handle this, because KeyValuePair is immutable,
// we store a new KVP with the newly consumed message back into the temp list, so that we can
// then enumerate the temp list en mass while taking the lock once afterwards.
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (!consumed)
{
// The protocol broke down, so throw an exception, as this is fatal. Before doing so, though,
// null out all of the messages we've already consumed, as a higher-level event handler
// should try to release everything in the reserved list.
for (int prev = 0; prev < i; prev++) reserved[prev] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>);
throw new InvalidOperationException(SR.InvalidOperation_FailedToConsumeReservedMessage);
}
var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue!);
var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage);
reserved[i] = consumedSourceAndMessage;
}
lock (IncomingLock)
{
// Increment the bounding count with the number of consumed messages
if (_boundingState != null) _boundingState.CurrentCount += reserved.Count;
// Enqueue the consumed messages
foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved)
{
_messages.Enqueue(sourceAndMessage.Value.Value);
}
}
}
/// <summary>
/// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
/// </summary>
private void ConsumeReservedMessagesGreedyBounded()
{
Debug.Assert(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
Debug.Assert(_boundingState != null, "Bounded state is required for bounded mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Consume the reserved items and store the data.
int consumedCount = 0;
List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
for (int i = 0; i < reserved.Count; i++)
{
// We can only store the data into _messages while holding the IncomingLock, we
// don't want to allocate extra objects for each batch, and we don't want to
// take and release the lock for each individual item... but we do need to use
// the consumed message rather than the initial one. To handle this, because KeyValuePair is immutable,
// we store a new KVP with the newly consumed message back into the temp list, so that we can
// then enumerate the temp list en mass while taking the lock once afterwards.
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (consumed)
{
var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue!);
var consumedSourceAndMessage = new KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>(sourceAndMessage.Key, consumedMessage);
reserved[i] = consumedSourceAndMessage;
// Keep track of the actually consumed messages
consumedCount++;
}
}
lock (IncomingLock)
{
// Increment the bounding count with the number of consumed messages
if (_boundingState != null) _boundingState.CurrentCount += consumedCount;
// Enqueue the consumed messages
foreach (KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage in reserved)
{
// If we didn't consume this message, the KeyValuePai will be default, i.e. the source will be null
if (sourceAndMessage.Key != null) _messages.Enqueue(sourceAndMessage.Value.Value);
}
}
}
/// <summary>
/// Releases all of the reserved messages stored in the non-greedy state's temporary reserved source list.
/// </summary>
/// <param name="throwOnFirstException">
/// Whether to allow an exception from a release to propagate immediately,
/// or to delay propagation until all releases have been attempted.
/// </param>
internal void ReleaseReservedMessages(bool throwOnFirstException)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "Should have been initialized");
List<Exception>? exceptions = null;
List<KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>> reserved = _nonGreedyState.ReservedSourcesTemp;
for (int i = 0; i < reserved.Count; i++)
{
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>);
ISourceBlock<T> source = sourceAndMessage.Key;
KeyValuePair<DataflowMessageHeader, T> message = sourceAndMessage.Value;
if (source != null && message.Key.IsValid)
{
try { source.ReleaseReservation(message.Key, _owningBatch); }
catch (Exception e)
{
if (throwOnFirstException) throw;
exceptions ??= new List<Exception>(1);
exceptions.Add(e);
}
}
}
if (exceptions != null) throw new AggregateException(exceptions);
}
/// <summary>Notifies the block that one or more items was removed from the queue.</summary>
/// <param name="numItemsRemoved">The number of items removed.</param>
internal void OnItemsRemoved(int numItemsRemoved)
{
Debug.Assert(numItemsRemoved > 0, "Should only be called for a positive number of items removed.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// If we're bounding, we need to know when an item is removed so that we
// can update the count that's mirroring the actual count in the source's queue,
// and potentially kick off processing to start consuming postponed messages.
if (_boundingState != null)
{
lock (IncomingLock)
{
// Decrement the count, which mirrors the count in the source half
Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
"It should be impossible to have a negative number of items.");
_boundingState.CurrentCount -= numItemsRemoved;
ProcessAsyncIfNecessary();
CompleteBlockIfPossible();
}
}
}
/// <summary>Counts the input items in a single output item or in a list of output items.</summary>
/// <param name="singleOutputItem">A single output item. Only considered if multipleOutputItems == null.</param>
/// <param name="multipleOutputItems">A list of output items. May be null.</param>
internal static int CountItems(T[] singleOutputItem, IList<T[]>? multipleOutputItems)
{
// If multipleOutputItems == null, then singleOutputItem is the subject of counting
if (multipleOutputItems == null) return singleOutputItem.Length;
// multipleOutputItems != null. Count the elements in each item.
int count = 0;
foreach (T[] item in multipleOutputItems) count += item.Length;
return count;
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int InputCountForDebugger { get { return _messages.Count; } }
/// <summary>Gets information about this helper to be used for display in a debugger.</summary>
/// <returns>Debugging information about this target.</returns>
internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
/// <summary>Gets the object to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent
{
get
{
var displayBatch = _owningBatch as IDebuggerDisplay;
return $"Block = \"{(displayBatch != null ? displayBatch.Content : _owningBatch)}\"";
}
}
/// <summary>Provides a wrapper for commonly needed debugging information.</summary>
internal sealed class DebuggingInformation
{
/// <summary>The target being viewed.</summary>
private readonly BatchBlockTargetCore _target;
/// <summary>Initializes the debugging helper.</summary>
/// <param name="target">The target being viewed.</param>
public DebuggingInformation(BatchBlockTargetCore target) { _target = target; }
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<T> InputQueue { get { return _target._messages.ToList(); } }
/// <summary>Gets the task being used for input processing.</summary>
public Task? TaskForInputProcessing { get { return _target._nonGreedyState?.TaskForInputProcessing; } }
/// <summary>Gets the collection of postponed messages.</summary>
public QueuedMap<ISourceBlock<T>, DataflowMessageHeader>? PostponedMessages { get { return _target._nonGreedyState?.PostponedMessages; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
/// <summary>Gets the number of batches that have been completed.</summary>
public long NumberOfBatchesCompleted { get { return _target._batchesCompleted; } }
}
}
}
}
|