|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// TargetCore.cs
//
//
// The core implementation of a standard ITargetBlock<TInput>.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
namespace System.Threading.Tasks.Dataflow.Internal
{
// LOCK-LEVELING SCHEME
// --------------------
// TargetCore employs a single lock: IncomingLock. This lock must not be used when calling out to any targets,
// which TargetCore should not have, anyway. It also must not be held when calling back to any sources, except
// during calls to OfferMessage from that same source.
/// <summary>Options used to configure a target core.</summary>
[Flags]
internal enum TargetCoreOptions : byte
{
/// <summary>Synchronous completion, both a target and a source, etc.</summary>
None = 0x0,
/// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
UsesAsyncCompletion = 0x1,
/// <summary>
/// Whether the block containing this target core is just a target or also has a source side.
/// If it's just a target, then this target core's completion represents the entire block's completion.
/// </summary>
RepresentsBlockCompletion = 0x2
}
/// <summary>
/// Provides a core implementation of <see cref="ITargetBlock{TInput}"/>.</summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="TargetCore{TInput}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
internal sealed class TargetCore<TInput>
{
// *** These fields are readonly and are initialized at AppDomain startup.
/// <summary>Caching the keep alive predicate.</summary>
private static readonly Common.KeepAlivePredicate<TargetCore<TInput>, KeyValuePair<TInput, long>> _keepAlivePredicate =
(TargetCore<TInput> thisTargetCore, out KeyValuePair<TInput, long> messageWithId) =>
thisTargetCore.TryGetNextAvailableOrPostponedMessage(out messageWithId);
// *** These fields are readonly and are initialized to new instances at construction.
/// <summary>A task representing the completion of the block.</summary>
private readonly TaskCompletionSource<VoidResult> _completionSource = new TaskCompletionSource<VoidResult>();
// *** These fields are readonly and are initialized by arguments to the constructor.
/// <summary>The target block using this helper.</summary>
private readonly ITargetBlock<TInput> _owningTarget;
/// <summary>The messages in this target.</summary>
/// <remarks>This field doubles as the IncomingLock.</remarks>
private readonly IProducerConsumerQueue<KeyValuePair<TInput, long>> _messages;
/// <summary>The options associated with this block.</summary>
private readonly ExecutionDataflowBlockOptions _dataflowBlockOptions;
/// <summary>An action to invoke for every accepted message.</summary>
private readonly Action<KeyValuePair<TInput, long>> _callAction;
/// <summary>Whether the block relies on the delegate to signal when an async operation has completed.</summary>
private readonly TargetCoreOptions _targetCoreOptions;
/// <summary>Bounding state for when the block is executing in bounded mode.</summary>
private readonly BoundingStateWithPostponed<TInput>? _boundingState;
/// <summary>The reordering buffer used by the owner. May be null.</summary>
private readonly IReorderingBuffer? _reorderingBuffer;
/// <summary>Gets the object used as the incoming lock.</summary>
private object IncomingLock { get { return _messages; } }
// *** These fields are mutated during execution.
/// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
private List<Exception>? _exceptions;
/// <summary>Whether to stop accepting new messages.</summary>
private bool _decliningPermanently;
/// <summary>The number of operations (including service tasks) currently running asynchronously.</summary>
/// <remarks>Must always be accessed from inside a lock.</remarks>
private int _numberOfOutstandingOperations;
/// <summary>The number of service tasks in async mode currently running.</summary>
/// <remarks>Must always be accessed from inside a lock.</remarks>
private int _numberOfOutstandingServiceTasks;
/// <summary>The next available ID we can assign to a message about to be processed.</summary>
private PaddedInt64 _nextAvailableInputMessageId; // initialized to 0... very important for a reordering buffer
/// <summary>A task has reserved the right to run the completion routine.</summary>
private bool _completionReserved;
/// <summary>This counter is set by the processing loop to prevent itself from trying to keep alive.</summary>
private int _keepAliveBanCounter;
/// <summary>Initializes the target core.</summary>
/// <param name="owningTarget">The target using this helper.</param>
/// <param name="callAction">An action to invoke for all accepted items.</param>
/// <param name="reorderingBuffer">The reordering buffer used by the owner; may be null.</param>
/// <param name="dataflowBlockOptions">The options to use to configure this block. The target core assumes these options are immutable.</param>
/// <param name="targetCoreOptions">Options for how the target core should behave.</param>
internal TargetCore(
ITargetBlock<TInput> owningTarget,
Action<KeyValuePair<TInput, long>> callAction,
IReorderingBuffer? reorderingBuffer,
ExecutionDataflowBlockOptions dataflowBlockOptions,
TargetCoreOptions targetCoreOptions)
{
// Validate internal arguments
Debug.Assert(owningTarget != null, "Core must be associated with a target block.");
Debug.Assert(dataflowBlockOptions != null, "Options must be provided to configure the core.");
Debug.Assert(callAction != null, "Action to invoke for each item is required.");
// Store arguments and do additional initialization
_owningTarget = owningTarget;
_callAction = callAction;
_reorderingBuffer = reorderingBuffer;
_dataflowBlockOptions = dataflowBlockOptions;
_targetCoreOptions = targetCoreOptions;
_messages = (dataflowBlockOptions.MaxDegreeOfParallelism == 1) ?
(IProducerConsumerQueue<KeyValuePair<TInput, long>>)new SingleProducerSingleConsumerQueue<KeyValuePair<TInput, long>>() :
(IProducerConsumerQueue<KeyValuePair<TInput, long>>)new MultiProducerMultiConsumerQueue<KeyValuePair<TInput, long>>();
if (_dataflowBlockOptions.BoundedCapacity != System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded)
{
Debug.Assert(_dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor");
_boundingState = new BoundingStateWithPostponed<TInput>(_dataflowBlockOptions.BoundedCapacity);
}
}
/// <summary>Internal Complete entry point with extra parameters for different contexts.</summary>
/// <param name="exception">If not null, the block will be faulted.</param>
/// <param name="dropPendingMessages">If true, any unprocessed input messages will be dropped.</param>
/// <param name="storeExceptionEvenIfAlreadyCompleting">If true, an exception will be stored after _decliningPermanently has been set to true.</param>
/// <param name="unwrapInnerExceptions">If true, exception will be treated as an AggregateException.</param>
/// <param name="revertProcessingState">Indicates whether the processing state is dirty and has to be reverted.</param>
internal void Complete(Exception? exception, bool dropPendingMessages, bool storeExceptionEvenIfAlreadyCompleting = false,
bool unwrapInnerExceptions = false, bool revertProcessingState = false)
{
Debug.Assert(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
"Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
// Ensure that no new messages may be added
lock (IncomingLock)
{
// Faulting from outside is allowed until we start declining permanently.
// Faulting from inside is allowed at any time.
if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting))
{
Debug.Assert(_numberOfOutstandingOperations > 0 || !storeExceptionEvenIfAlreadyCompleting,
"Calls with storeExceptionEvenIfAlreadyCompleting==true may only be coming from processing task.");
Common.AddException(ref _exceptions, exception, unwrapInnerExceptions);
}
// Clear the messages queue if requested
if (dropPendingMessages)
{
KeyValuePair<TInput, long> dummy;
while (_messages.TryDequeue(out dummy)) ;
}
// Revert the dirty processing state if requested
if (revertProcessingState)
{
Debug.Assert(_numberOfOutstandingOperations > 0 && (!UsesAsyncCompletion || _numberOfOutstandingServiceTasks > 0),
"The processing state must be dirty when revertProcessingState==true.");
_numberOfOutstandingOperations--;
if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks--;
}
// Trigger completion
_decliningPermanently = true;
CompleteBlockIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput>? source, bool consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
lock (IncomingLock)
{
// If we shouldn't be accepting more messages, don't.
if (_decliningPermanently)
{
CompleteBlockIfPossible();
return DataflowMessageStatus.DecliningPermanently;
}
// We can directly accept the message if:
// 1) we are not bounding, OR
// 2) we are bounding AND there is room available AND there are no postponed messages AND no messages are currently being transferred to the input queue.
// (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
// (Unlike all other blocks, TargetCore can accept messages while processing, because
// input message IDs are properly assigned and the correct order is preserved.)
if (_boundingState == null ||
(_boundingState.OutstandingTransfers == 0 && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0))
{
// Consume the message from the source if necessary
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, _owningTarget, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// Assign a message ID - strictly sequential, no gaps.
// Once consumed, enqueue the message with its ID and kick off asynchronous processing.
long messageId = _nextAvailableInputMessageId.Value++;
Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
_messages.Enqueue(new KeyValuePair<TInput, long>(messageValue!, messageId));
ProcessAsyncIfNecessary();
return DataflowMessageStatus.Accepted;
}
// Otherwise, we try to postpone if a source was provided
else if (source != null)
{
Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
"PostponedMessages must have been initialized during construction in non-greedy mode.");
// Store the message's info and kick off asynchronous processing
_boundingState.PostponedMessages.Push(source, messageHeader);
ProcessAsyncIfNecessary();
return DataflowMessageStatus.Postponed;
}
// We can't do anything else about this message
return DataflowMessageStatus.Declined;
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
internal Task Completion { get { return _completionSource.Task; } }
/// <summary>Gets the number of items waiting to be processed by this target.</summary>
internal int InputCount { get { return _messages.GetCountSafe(IncomingLock); } }
/// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
internal void SignalOneAsyncMessageCompleted()
{
SignalOneAsyncMessageCompleted(boundingCountChange: 0);
}
/// <summary>Signals to the target core that a previously launched asynchronous operation has now completed.</summary>
/// <param name="boundingCountChange">The number of elements by which to change the bounding count, if bounding is occurring.</param>
internal void SignalOneAsyncMessageCompleted(int boundingCountChange)
{
lock (IncomingLock)
{
// We're no longer processing, so decrement the DOP counter
Debug.Assert(_numberOfOutstandingOperations > 0, "Operations may only be completed if any are outstanding.");
if (_numberOfOutstandingOperations > 0) _numberOfOutstandingOperations--;
// Fix up the bounding count if necessary
if (_boundingState != null && boundingCountChange != 0)
{
Debug.Assert(boundingCountChange <= 0 && _boundingState.CurrentCount + boundingCountChange >= 0,
"Expected a negative bounding change and not to drop below zero.");
_boundingState.CurrentCount += boundingCountChange;
}
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ProcessAsyncIfNecessary(repeat: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteBlockIfPossible();
}
}
/// <summary>Gets whether this instance has been constructed for async processing.</summary>
private bool UsesAsyncCompletion
{
get
{
return (_targetCoreOptions & TargetCoreOptions.UsesAsyncCompletion) != 0;
}
}
/// <summary>Gets whether there's room to launch more processing operations.</summary>
private bool HasRoomForMoreOperations
{
get
{
Debug.Assert(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
Debug.Assert(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
Debug.Assert(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
// In async mode, we increment _numberOfOutstandingOperations before we start
// our own processing loop which should not count towards the MaxDOP.
return (_numberOfOutstandingOperations - _numberOfOutstandingServiceTasks) < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
}
}
/// <summary>Gets whether there's room to launch more service tasks for doing/launching processing operations.</summary>
private bool HasRoomForMoreServiceTasks
{
get
{
Debug.Assert(_numberOfOutstandingOperations >= 0, "Number of outstanding operations should never be negative.");
Debug.Assert(_numberOfOutstandingServiceTasks >= 0, "Number of outstanding service tasks should never be negative.");
Debug.Assert(_numberOfOutstandingOperations >= _numberOfOutstandingServiceTasks, "Number of outstanding service tasks should never exceed the number of outstanding operations.");
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (!UsesAsyncCompletion)
{
// Sync mode:
// We don't count service tasks, because our tasks are counted as operations.
// Therefore, return HasRoomForMoreOperations.
return HasRoomForMoreOperations;
}
else
{
// Async mode:
// We allow up to MaxDOP true service tasks.
// Checking whether there is room for more processing operations is not necessary,
// but doing so will help us avoid spinning up a task that will go away without
// launching any processing operation.
return HasRoomForMoreOperations &&
_numberOfOutstandingServiceTasks < _dataflowBlockOptions.ActualMaxDegreeOfParallelism;
}
}
}
/// <summary>Called when new messages are available to be processed.</summary>
/// <param name="repeat">Whether this call is the continuation of a previous message loop.</param>
private void ProcessAsyncIfNecessary(bool repeat = false)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (HasRoomForMoreServiceTasks)
{
ProcessAsyncIfNecessary_Slow(repeat);
}
}
/// <summary>
/// Slow path for ProcessAsyncIfNecessary.
/// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
/// </summary>
private void ProcessAsyncIfNecessary_Slow(bool repeat)
{
Debug.Assert(HasRoomForMoreServiceTasks, "There must be room to process asynchronously.");
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
// Determine preconditions to launching a processing task
bool messagesAvailableOrPostponed =
!_messages.IsEmpty ||
(!_decliningPermanently && _boundingState != null && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count > 0);
// If all conditions are met, launch away
if (messagesAvailableOrPostponed && !CanceledOrFaulted)
{
// Any book keeping related to the processing task like incrementing the
// DOP counter or eventually recording the tasks reference must be done
// before the task starts. That is because the task itself will do the
// reverse operation upon its completion.
_numberOfOutstandingOperations++;
if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks++;
var taskForInputProcessing = new Task(static thisTargetCore => ((TargetCore<TInput>)thisTargetCore!).ProcessMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(repeat));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
_owningTarget, taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
_messages.Count + (_boundingState != null ? _boundingState.PostponedMessages.Count : 0));
}
// Start the task handling scheduling exceptions
Exception? exception = Common.StartTaskSafe(taskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// Get out from under currently held locks. Complete re-acquires the locks it needs.
Task.Factory.StartNew(exc => Complete(exception: (Exception)exc!, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true,
unwrapInnerExceptions: false, revertProcessingState: true),
exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>Task body used to process messages.</summary>
private void ProcessMessagesLoopCore()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
KeyValuePair<TInput, long> messageWithId = default(KeyValuePair<TInput, long>);
try
{
bool useAsyncCompletion = UsesAsyncCompletion;
bool shouldAttemptPostponedTransfer = _boundingState != null && _boundingState.BoundedCapacity > 1;
int numberOfMessagesProcessedByThisTask = 0;
int numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
while (numberOfMessagesProcessedByThisTask < maxMessagesPerTask && !CanceledOrFaulted)
{
// If we're bounding, try to transfer a message from the postponed queue
// to the input queue. This enables us to more quickly unblock sources
// sending data to the block (otherwise, no postponed messages will be consumed
// until the input queue is entirely empty). If the bounded size is 1,
// there's no need to transfer, as attempting to get the next message will
// just go and consume the postponed message anyway, and we'll save
// the extra trip through the _messages queue.
KeyValuePair<TInput, long> transferMessageWithId;
if (shouldAttemptPostponedTransfer &&
TryConsumePostponedMessage(forPostponementTransfer: true, result: out transferMessageWithId))
{
lock (IncomingLock)
{
Debug.Assert(
_boundingState!.OutstandingTransfers > 0
&& _boundingState.OutstandingTransfers <= _dataflowBlockOptions.ActualMaxDegreeOfParallelism,
"Expected TryConsumePostponedMessage to have incremented the count and for the count to not exceed the DOP.");
_boundingState.OutstandingTransfers--; // was incremented in TryConsumePostponedMessage
_messages.Enqueue(transferMessageWithId);
ProcessAsyncIfNecessary();
}
}
if (useAsyncCompletion)
{
// Get the next message if DOP is available.
// If we can't get a message or DOP is not available, bail out.
if (!TryGetNextMessageForNewAsyncOperation(out messageWithId)) break;
}
else
{
// Try to get a message for sequential execution, i.e. without checking DOP availability
if (!TryGetNextAvailableOrPostponedMessage(out messageWithId))
{
// Try to keep the task alive only if MaxDOP=1
if (_dataflowBlockOptions.MaxDegreeOfParallelism != 1) break;
// If this task has processed enough messages without being kept alive,
// it has served its purpose. Don't keep it alive.
if (numberOfMessagesProcessedSinceTheLastKeepAlive > Common.KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD) break;
// If keep alive is banned, don't attempt it
if (_keepAliveBanCounter > 0)
{
_keepAliveBanCounter--;
break;
}
// Reset the keep alive counter. (Keep this line together with TryKeepAliveUntil.)
numberOfMessagesProcessedSinceTheLastKeepAlive = 0;
// Try to keep the task alive briefly until a new message arrives
if (!Common.TryKeepAliveUntil(_keepAlivePredicate, this, out messageWithId))
{
// Keep alive was unsuccessful.
// Therefore ban further attempts temporarily.
_keepAliveBanCounter = Common.KEEP_ALIVE_BAN_COUNT;
break;
}
}
}
// We have popped a message from the queue.
// So increment the counter of processed messages.
numberOfMessagesProcessedByThisTask++;
numberOfMessagesProcessedSinceTheLastKeepAlive++;
// Invoke the user action
_callAction(messageWithId);
}
}
catch (Exception exc)
{
Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key);
Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
}
finally
{
lock (IncomingLock)
{
// We incremented _numberOfOutstandingOperations before we launched this task.
// So we must decremented it before exiting.
// Note that each async task additionally incremented it before starting and
// is responsible for decrementing it prior to exiting.
Debug.Assert(_numberOfOutstandingOperations > 0, "Expected a positive number of outstanding operations, since we're completing one here.");
_numberOfOutstandingOperations--;
// If we are in async mode, we've also incremented _numberOfOutstandingServiceTasks.
// Now it's time to decrement it.
if (UsesAsyncCompletion)
{
Debug.Assert(_numberOfOutstandingServiceTasks > 0, "Expected a positive number of outstanding service tasks, since we're completing one here.");
_numberOfOutstandingServiceTasks--;
}
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ProcessAsyncIfNecessary(repeat: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteBlockIfPossible();
}
}
}
/// <summary>Retrieves the next message from the input queue for the useAsyncCompletion mode.</summary>
/// <param name="messageWithId">The next message retrieved.</param>
/// <returns>true if a message was found and removed; otherwise, false.</returns>
private bool TryGetNextMessageForNewAsyncOperation(out KeyValuePair<TInput, long> messageWithId)
{
Debug.Assert(UsesAsyncCompletion, "Only valid to use when in async mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
bool parallelismAvailable;
lock (IncomingLock)
{
// If we have room for another asynchronous operation, reserve it.
// If later it turns out that we had no work to fill the slot, we'll undo the addition.
parallelismAvailable = HasRoomForMoreOperations;
if (parallelismAvailable) ++_numberOfOutstandingOperations;
}
messageWithId = default(KeyValuePair<TInput, long>);
if (parallelismAvailable)
{
// If a parallelism slot was available, try to get an item.
// Be careful, because an exception may be thrown from ConsumeMessage
// and we have already incremented _numberOfOutstandingOperations.
bool gotMessage;
try
{
gotMessage = TryGetNextAvailableOrPostponedMessage(out messageWithId);
}
catch
{
// We have incremented the counter, but we didn't get a message.
// So we must undo the increment and eventually complete the block.
SignalOneAsyncMessageCompleted();
// Re-throw the exception. The processing loop will catch it.
throw;
}
// There may not be an error, but may have still failed to get a message.
// So we must undo the increment and eventually complete the block.
if (!gotMessage) SignalOneAsyncMessageCompleted();
return gotMessage;
}
// If there was no parallelism available, we didn't increment _numberOfOutstandingOperations.
// So there is nothing to do except to return false.
return false;
}
/// <summary>
/// Either takes the next available message from the input queue or retrieves a postponed
/// message from a source, based on whether we're in greedy or non-greedy mode.
/// </summary>
/// <param name="messageWithId">The retrieved item with its Id.</param>
/// <returns>true if a message could be removed and returned; otherwise, false.</returns>
private bool TryGetNextAvailableOrPostponedMessage(out KeyValuePair<TInput, long> messageWithId)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// First try to get a message from our input buffer.
if (_messages.TryDequeue(out messageWithId))
{
return true;
}
// If we can't, but if we have any postponed messages due to bounding, then
// try to consume one of these postponed messages.
// Since we are not currently holding the lock, it is possible that new messages get queued up
// by the time we take the lock to manipulate _boundingState. So we have to double-check the
// input queue once we take the lock before we consider postponed messages.
else if (_boundingState != null && TryConsumePostponedMessage(forPostponementTransfer: false, result: out messageWithId))
{
return true;
}
// Otherwise, there's no message available.
else
{
messageWithId = default(KeyValuePair<TInput, long>);
return false;
}
}
/// <summary>Consumes a single postponed message.</summary>
/// <param name="forPostponementTransfer">
/// true if the method is being called to consume a message that'll then be stored into the input queue;
/// false if the method is being called to consume a message that'll be processed immediately.
/// If true, the bounding state's ForcePostponement will be updated.
/// If false, the method will first try (while holding the lock) to consume from the input queue before
/// consuming a postponed message.
/// </param>
/// <param name="result">The consumed message.</param>
/// <returns>true if a message was consumed; otherwise, false.</returns>
private bool TryConsumePostponedMessage(
bool forPostponementTransfer,
out KeyValuePair<TInput, long> result)
{
Debug.Assert(
_dataflowBlockOptions.BoundedCapacity !=
System.Threading.Tasks.Dataflow.DataflowBlockOptions.Unbounded, "Only valid to use when in bounded mode.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Iterate until we either consume a message successfully or there are no more postponed messages.
bool stateOptimisticallyUpdatedForConsumedMessage = false;
long messageId = Common.INVALID_REORDERING_ID;
while (true)
{
KeyValuePair<ISourceBlock<TInput>, DataflowMessageHeader> element;
lock (IncomingLock)
{
// If we are declining permanently, don't consume postponed messages.
if (_decliningPermanently) break;
// New messages may have been queued up while we weren't holding the lock.
// In particular, the input queue may have been filled up and messages may have
// gotten postponed. If we process such a postponed message, we would mess up the
// order. Therefore, we have to double-check the input queue first.
if (!forPostponementTransfer && _messages.TryDequeue(out result))
{
// We got a message. If on a previous iteration of this loop we allocated a
// message ID, we need to inform the reordering buffer (if there is one) that
// the message ID will never used (since the message we got already has its
// own ID assigned).
if (stateOptimisticallyUpdatedForConsumedMessage)
{
_reorderingBuffer?.IgnoreItem(messageId);
}
return true;
}
// We can consume a message to process if there's one to process and also if
// if we have logical room within our bound for the message.
if (!_boundingState!.CountIsLessThanBound || !_boundingState.PostponedMessages.TryPop(out element))
{
break;
}
if (!stateOptimisticallyUpdatedForConsumedMessage)
{
stateOptimisticallyUpdatedForConsumedMessage = true;
messageId = _nextAvailableInputMessageId.Value++; // optimistically assign an ID
Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid.");
_boundingState.CurrentCount += 1; // optimistically take bounding space
if (forPostponementTransfer)
{
Debug.Assert(_boundingState.OutstandingTransfers >= 0, "Expected TryConsumePostponedMessage to not be negative.");
_boundingState.OutstandingTransfers++; // temporarily force postponement until we've successfully consumed the element
}
}
} // Must not call to source while holding lock
bool consumed;
TInput? consumedValue = element.Key.ConsumeMessage(element.Value, _owningTarget, out consumed);
if (consumed)
{
result = new KeyValuePair<TInput, long>(consumedValue!, messageId);
return true;
}
}
if (stateOptimisticallyUpdatedForConsumedMessage)
{
// If we optimistically increased the bounding count, allocated a message ID,
// and noted an outstanding transfer, we need to undo those state changes, now
// that we've failed to consume any message.
_reorderingBuffer?.IgnoreItem(messageId);
if (forPostponementTransfer)
{
lock (IncomingLock)
{
_boundingState!.OutstandingTransfers--;
}
}
ChangeBoundingCount(-1);
}
// Inform the caller that no message could be consumed.
result = default(KeyValuePair<TInput, long>);
return false;
}
/// <summary>Gets whether the target has had cancellation requested or an exception has occurred.</summary>
private bool CanceledOrFaulted
{
get
{
return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || Volatile.Read(ref _exceptions) != null;
}
}
/// <summary>Completes the block once all completion conditions are met.</summary>
private void CompleteBlockIfPossible()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
bool noMoreMessages = _decliningPermanently && _messages.IsEmpty;
if (noMoreMessages || CanceledOrFaulted)
{
CompleteBlockIfPossible_Slow();
}
}
/// <summary>
/// Slow path for CompleteBlockIfPossible.
/// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
/// </summary>
private void CompleteBlockIfPossible_Slow()
{
Debug.Assert((_decliningPermanently && _messages.IsEmpty) || CanceledOrFaulted, "There must be no more messages.");
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
bool notCurrentlyProcessing = _numberOfOutstandingOperations == 0;
if (notCurrentlyProcessing && !_completionReserved)
{
// Make sure no one else tries to call CompleteBlockOncePossible
_completionReserved = true;
// Make sure the target is declining
_decliningPermanently = true;
// Get out from under currently held locks. This is to avoid
// invoking synchronous continuations off of _completionSource.Task
// while holding a lock.
Task.Factory.StartNew(static state => ((TargetCore<TInput>)state!).CompleteBlockOncePossible(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
/// <summary>
/// Completes the block. This must only be called once, and only once all of the completion conditions are met.
/// As such, it must only be called from CompleteBlockIfPossible.
/// </summary>
private void CompleteBlockOncePossible()
{
// Since the lock is needed only for the Assert, we do this only in DEBUG mode
#if DEBUG
lock (IncomingLock) Debug.Assert(_numberOfOutstandingOperations == 0, "Everything must be done by now.");
#endif
// Release any postponed messages
if (_boundingState != null)
{
// Note: No locks should be held at this point.
Common.ReleaseAllPostponedMessages(_owningTarget, _boundingState.PostponedMessages, ref _exceptions);
}
// For good measure and help in preventing leaks, clear out the incoming message queue,
// which may still contain orphaned data if we were canceled or faulted. However,
// we don't reset the bounding count here, as the block as a whole may still be active.
KeyValuePair<TInput, long> ignored;
IProducerConsumerQueue<KeyValuePair<TInput, long>> messages = _messages;
while (messages.TryDequeue(out ignored)) ;
// If we completed with any unhandled exception, finish in an error state
if (Volatile.Read(ref _exceptions) != null)
{
// It's ok to read _exceptions' content here, because
// at this point no more exceptions can be generated and thus no one will
// be writing to it.
_completionSource.TrySetException(Volatile.Read(ref _exceptions!));
}
// If we completed with cancellation, finish in a canceled state
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
_completionSource.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
// Otherwise, finish in a successful state.
else
{
_completionSource.TrySetResult(default(VoidResult));
}
// We only want to do tracing for block completion if this target core represents the whole block.
// If it only represents a part of the block (i.e. there's a source associated with it as well),
// then we shouldn't log just for the first half of the block; the source half will handle logging.
DataflowEtwProvider etwLog;
if ((_targetCoreOptions & TargetCoreOptions.RepresentsBlockCompletion) != 0 &&
(etwLog = DataflowEtwProvider.Log).IsEnabled())
{
etwLog.DataflowBlockCompleted(_owningTarget);
}
}
/// <summary>Gets whether the target core is operating in a bounded mode.</summary>
internal bool IsBounded { get { return _boundingState != null; } }
/// <summary>Increases or decreases the bounding count.</summary>
/// <param name="count">The incremental addition (positive to increase, negative to decrease).</param>
internal void ChangeBoundingCount(int count)
{
Debug.Assert(count != 0, "Should only be called when the count is actually changing.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
if (_boundingState != null)
{
lock (IncomingLock)
{
Debug.Assert(count > 0 || (count < 0 && _boundingState.CurrentCount + count >= 0),
"If count is negative, it must not take the total count negative.");
_boundingState.CurrentCount += count;
ProcessAsyncIfNecessary();
CompleteBlockIfPossible();
}
}
}
/// <summary>Gets the object to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent
{
get
{
var displayTarget = _owningTarget as IDebuggerDisplay;
return $"Block = \"{(displayTarget != null ? displayTarget.Content : _owningTarget)}\"";
}
}
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
/// <summary>Gets information about this helper to be used for display in a debugger.</summary>
/// <returns>Debugging information about this target.</returns>
internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
/// <summary>Provides a wrapper for commonly needed debugging information.</summary>
internal sealed class DebuggingInformation
{
/// <summary>The target being viewed.</summary>
private readonly TargetCore<TInput> _target;
/// <summary>Initializes the debugging helper.</summary>
/// <param name="target">The target being viewed.</param>
internal DebuggingInformation(TargetCore<TInput> target) { _target = target; }
/// <summary>Gets the number of messages waiting to be processed.</summary>
internal int InputCount { get { return _target._messages.Count; } }
/// <summary>Gets the messages waiting to be processed.</summary>
internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(static kvp => kvp.Key).ToList(); } }
/// <summary>Gets any postponed messages.</summary>
internal QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>? PostponedMessages
{
get { return _target._boundingState?.PostponedMessages; }
}
/// <summary>Gets the current number of outstanding input processing operations.</summary>
internal int CurrentDegreeOfParallelism { get { return _target._numberOfOutstandingOperations - _target._numberOfOutstandingServiceTasks; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
internal bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
internal bool IsCompleted { get { return _target.Completion.IsCompleted; } }
}
}
}
|