|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SourceCore.cs
//
//
// The core implementation of a standard ISourceBlock<TOutput>.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
namespace System.Threading.Tasks.Dataflow.Internal
{
// LOCK-LEVELING SCHEME
// --------------------
// SourceCore employs two locks: OutgoingLock and ValueLock. Additionally, targets we call out to
// likely utilize their own IncomingLock. We can hold OutgoingLock while acquiring ValueLock or IncomingLock.
// However, we cannot hold ValueLock while calling out to external code or while acquiring OutgoingLock, and
// we cannot hold IncomingLock when acquiring OutgoingLock. Additionally, the locks employed must be reentrant.
/// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
internal sealed class SourceCore<TOutput>
{
// *** These fields are readonly and are initialized to new instances at construction.
/// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>A registry used to store all linked targets and information about them.</summary>
private readonly TargetRegistry<TOutput> _targetRegistry;
/// <summary>The output messages queued up to be received by consumers/targets.</summary>
/// <remarks>
/// The queue is only ever accessed by a single producer and single consumer at a time. On the producer side,
/// we require that AddMessage/AddMessages are the only places the queue is added to, and we require that those
/// methods not be used concurrently with anything else. All of our target halves today follow that restriction;
/// for example, TransformBlock with DOP==1 will have at most a single task processing the user provided delegate,
/// and thus at most one task calling AddMessage. If it has a DOP > 1, it'll go through the ReorderingBuffer,
/// which will use a lock to synchronize the output of all of the processing tasks such that only one is using
/// AddMessage at a time. On the consumer side of SourceCore, all consumption is protected by ValueLock, and thus
/// all consumption is serialized.
/// </remarks>
private readonly SingleProducerSingleConsumerQueue<TOutput> _messages = new SingleProducerSingleConsumerQueue<TOutput>(); // protected by AddMessage/ValueLock
/// <summary>Gets the object to use as the outgoing lock.</summary>
private object OutgoingLock { get { return _completionTask; } }
/// <summary>Gets the object to use as the value lock.</summary>
private object ValueLock { get { return _targetRegistry; } }
// *** These fields are readonly and are initialized by arguments to the constructor.
/// <summary>The source utilizing this helper.</summary>
private readonly ISourceBlock<TOutput> _owningSource;
/// <summary>The options used to configure this block's execution.</summary>
private readonly DataflowBlockOptions _dataflowBlockOptions;
/// <summary>
/// An action to be invoked on the owner block to stop accepting messages.
/// This action is invoked when SourceCore encounters an exception.
/// </summary>
private readonly Action<ISourceBlock<TOutput>> _completeAction;
/// <summary>
/// An action to be invoked on the owner block when an item is removed.
/// This may be null if the owner block doesn't need to be notified.
/// </summary>
private readonly Action<ISourceBlock<TOutput>, int>? _itemsRemovedAction;
/// <summary>Item counting function</summary>
private readonly Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>?, int>? _itemCountingFunc;
// *** These fields are mutated during execution.
/// <summary>The task used to process the output and offer it to targets.</summary>
private Task? _taskForOutputProcessing; // protected by ValueLock
/// <summary>Counter for message IDs unique within this source block.</summary>
private PaddedInt64 _nextMessageId = new PaddedInt64 { Value = 1 }; // We are going to use this value before incrementing. Protected by ValueLock.
/// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
private ITargetBlock<TOutput>? _nextMessageReservedFor; // protected by OutgoingLock
/// <summary>Whether all future messages should be declined.</summary>
private bool _decliningPermanently; // Protected by ValueLock
/// <summary>Whether this block should again attempt to offer messages to targets.</summary>
private bool _enableOffering = true; // Protected by ValueLock, sometimes read with volatile reads
/// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
private bool _completionReserved; // Protected by OutgoingLock
/// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
private List<Exception>? _exceptions; // Protected by ValueLock, sometimes read with volatile reads
/// <summary>Initializes the source core.</summary>
/// <param name="owningSource">The source utilizing this core.</param>
/// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
/// <param name="completeAction">Action to invoke in order to decline the associated target half, which will in turn decline this source core.</param>
/// <param name="itemsRemovedAction">Action to invoke when one or more items is removed. This may be null.</param>
/// <param name="itemCountingFunc">
/// Action to invoke when the owner needs to be able to count the number of individual
/// items in an output or set of outputs.
/// </param>
internal SourceCore(
ISourceBlock<TOutput> owningSource, DataflowBlockOptions dataflowBlockOptions,
Action<ISourceBlock<TOutput>> completeAction,
Action<ISourceBlock<TOutput>, int>? itemsRemovedAction = null,
Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>?, int>? itemCountingFunc = null)
{
Debug.Assert(owningSource != null, "Core must be associated with a source.");
Debug.Assert(dataflowBlockOptions != null, "Options must be provided to configure the core.");
Debug.Assert(completeAction != null, "Action to invoke on completion is required.");
// Store the args
_owningSource = owningSource;
_dataflowBlockOptions = dataflowBlockOptions;
_itemsRemovedAction = itemsRemovedAction;
_itemCountingFunc = itemCountingFunc;
_completeAction = completeAction;
// Construct members that depend on the args
_targetRegistry = new TargetRegistry<TOutput>(_owningSource);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
{
if (target is null)
{
throw new ArgumentNullException(nameof(target));
}
if (linkOptions is null)
{
throw new ArgumentNullException(nameof(linkOptions));
}
// If the block is already completed, there is not much to do -
// we have to propagate completion if that was requested, and
// then bail without taking the lock.
if (_completionTask.Task.IsCompleted)
{
if (linkOptions.PropagateCompletion) Common.PropagateCompletion(_completionTask.Task, target, exceptionHandler: null);
return Disposables.Nop;
}
lock (OutgoingLock)
{
// If completion has been reserved, the target registry has either been cleared already
// or is about to be cleared. So we can link and offer only if completion is not reserved.
if (!_completionReserved)
{
_targetRegistry.Add(ref target, linkOptions);
OfferToTargets(linkToTarget: target);
return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
}
}
// The block should not offer any messages when it is in this state, but
// it should still propagate completion if that has been requested.
if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
return Disposables.Nop;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
internal TOutput? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (target == null) throw new ArgumentNullException(nameof(target));
TOutput? consumedMessageValue = default(TOutput);
lock (OutgoingLock)
{
// If this target doesn't hold the reservation, then for this ConsumeMessage
// to be valid, there must not be any reservation (since otherwise we can't
// consume a message destined for someone else).
if (_nextMessageReservedFor != target &&
_nextMessageReservedFor != null)
{
messageConsumed = false;
return default(TOutput);
}
lock (ValueLock)
{
// If the requested message isn't the next message to be served up, bail.
// Otherwise, we're good to go: dequeue the message as it will now be owned by the target,
// signal that we can resume enabling offering as there's potentially a new "next message",
// complete if necessary, and offer asynchronously all messages as is appropriate.
if (messageHeader.Id != _nextMessageId.Value ||
!_messages.TryDequeue(out consumedMessageValue))
{
messageConsumed = false;
return default(TOutput);
}
_nextMessageReservedFor = null;
_targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
_enableOffering = true; // reenable offering if it was disabled
_nextMessageId.Value++;
CompleteBlockIfPossible();
OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
}
}
// Notify the owner block that our count has decreased
if (_itemsRemovedAction != null)
{
int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, consumedMessageValue, null) : 1;
_itemsRemovedAction(_owningSource, count);
}
// Return the consumed message value
messageConsumed = true;
return consumedMessageValue;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
internal bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (target == null) throw new ArgumentNullException(nameof(target));
lock (OutgoingLock)
{
// If no one currently holds a reservation...
if (_nextMessageReservedFor == null)
{
lock (ValueLock)
{
// ...and if the requested message is next in the queue, allow it
if (messageHeader.Id == _nextMessageId.Value && !_messages.IsEmpty)
{
_nextMessageReservedFor = target;
_enableOffering = false;
return true;
}
}
}
}
return false;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (target == null) throw new ArgumentNullException(nameof(target));
lock (OutgoingLock)
{
// If someone else holds the reservation, bail.
if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
lock (ValueLock)
{
// If this is not the message at the head of the queue, bail
if (messageHeader.Id != _nextMessageId.Value || _messages.IsEmpty) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
// Otherwise, release the reservation
_nextMessageReservedFor = null;
Debug.Assert(!_enableOffering, "Offering should have been disabled if there was a valid reservation");
_enableOffering = true;
// Now there is at least one message ready for offering. So offer it.
// If a cancellation is pending, this method will bail out.
OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
// This reservation may be holding the block's completion. So try to complete.
CompleteBlockIfPossible();
}
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
internal Task Completion { get { return _completionTask.Task; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
internal bool TryReceive(Predicate<TOutput>? filter, [MaybeNullWhen(false)] out TOutput item)
{
item = default(TOutput);
bool itemReceived = false;
lock (OutgoingLock)
{
// If the next message is reserved for someone, we can't receive right now. Otherwise...
if (_nextMessageReservedFor == null)
{
lock (ValueLock)
{
// If there's at least one message, and there's no filter or the next item
// passes the filter, dequeue it to be returned.
if (_messages.TryDequeueIf(filter, out item))
{
_nextMessageId.Value++;
// Now that the next message has changed, reenable offering if it was disabled
_enableOffering = true;
// If removing this item was the last thing this block will ever do, complete it,
CompleteBlockIfPossible();
// Now, try to offer up messages asynchronously, since we've
// changed what's at the head of the queue
OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
itemReceived = true;
}
}
}
}
if (itemReceived)
{
// Notify the owner block that our count has decreased
if (_itemsRemovedAction != null)
{
int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, item!, null) : 1;
_itemsRemovedAction(_owningSource, count);
}
}
return itemReceived;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
internal bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput>? items)
{
items = null;
int countReceived = 0;
lock (OutgoingLock)
{
// If the next message is reserved for someone, we can't receive right now. Otherwise...
if (_nextMessageReservedFor == null)
{
lock (ValueLock)
{
if (!_messages.IsEmpty)
{
// Receive all of the data, clearing it out in the process.
var tmpList = new List<TOutput>();
TOutput? item;
while (_messages.TryDequeue(out item)) tmpList.Add(item);
countReceived = tmpList.Count;
items = tmpList;
// Increment the next ID. Any new value is good.
_nextMessageId.Value++;
// Now that the next message has changed, reenable offering if it was disabled
_enableOffering = true;
// Now that the block is empty, check to see whether we should complete.
CompleteBlockIfPossible();
}
}
}
}
if (countReceived > 0)
{
// Notify the owner block that our count has decreased
if (_itemsRemovedAction != null)
{
int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, default(TOutput)!, items) : countReceived;
_itemsRemovedAction(_owningSource, count);
}
Debug.Assert(items != null);
return true;
}
return false;
}
/// <summary>Gets the number of items available to be received from this block.</summary>
internal int OutputCount { get { lock (OutgoingLock) lock (ValueLock) return _messages.Count; } }
/// <summary>
/// Adds a message to the source block for propagation.
/// This method must only be used by one thread at a time, and must not be used concurrently
/// with any other producer side methods, e.g. AddMessages, Complete.
/// </summary>
/// <param name="item">The item to be wrapped in a message to be added.</param>
internal void AddMessage(TOutput item)
{
// This method must not take the OutgoingLock, as it will likely be called in situations
// where an IncomingLock is held.
if (_decliningPermanently) return;
_messages.Enqueue(item);
Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
if (_taskForOutputProcessing == null)
{
// Separated out to enable inlining of AddMessage
OfferAsyncIfNecessaryWithValueLock();
}
}
/// <summary>
/// Adds messages to the source block for propagation.
/// This method must only be used by one thread at a time, and must not be used concurrently
/// with any other producer side methods, e.g. AddMessage, Complete.
/// </summary>
/// <param name="items">The list of items to be wrapped in messages to be added.</param>
internal void AddMessages(IEnumerable<TOutput> items)
{
Debug.Assert(items != null, "Items list must be valid.");
// This method must not take the OutgoingLock, as it will likely be called in situations
// where an IncomingLock is held.
if (_decliningPermanently) return;
// Special case arrays and lists, for which we can avoid the
// enumerator allocation that'll result from using a foreach.
// This also avoids virtual method calls that we'd get if we
// didn't special case.
var itemsAsList = items as List<TOutput>;
if (itemsAsList != null)
{
for (int i = 0; i < itemsAsList.Count; i++)
{
_messages.Enqueue(itemsAsList[i]);
}
}
else
{
TOutput[]? itemsAsArray = items as TOutput[];
if (itemsAsArray != null)
{
for (int i = 0; i < itemsAsArray.Length; i++)
{
_messages.Enqueue(itemsAsArray[i]);
}
}
else
{
foreach (TOutput item in items)
{
_messages.Enqueue(item);
}
}
}
Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
if (_taskForOutputProcessing == null)
{
OfferAsyncIfNecessaryWithValueLock();
}
}
/// <summary>Adds an individual exception to this source.</summary>
/// <param name="exception">The exception to add</param>
internal void AddException(Exception exception)
{
Debug.Assert(exception != null, "Valid exception must be provided to be added.");
Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
lock (ValueLock)
{
Common.AddException(ref _exceptions, exception);
}
}
/// <summary>Adds exceptions to this source.</summary>
/// <param name="exceptions">The exceptions to add</param>
internal void AddExceptions(List<Exception> exceptions)
{
Debug.Assert(exceptions != null, "Valid exceptions must be provided to be added.");
Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
lock (ValueLock)
{
foreach (Exception exception in exceptions)
{
Common.AddException(ref _exceptions, exception);
}
}
}
/// <summary>Adds the exceptions contained in an AggregateException to this source.</summary>
/// <param name="aggregateException">The exception to add</param>
internal void AddAndUnwrapAggregateException(AggregateException aggregateException)
{
Debug.Assert(aggregateException != null && aggregateException.InnerExceptions.Count > 0, "Aggregate must be valid and contain inner exceptions to unwrap.");
Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
lock (ValueLock)
{
Common.AddException(ref _exceptions, aggregateException, unwrapInnerExceptions: true);
}
}
/// <summary>Gets whether the _exceptions list is non-null.</summary>
internal bool HasExceptions
{
get
{
// We may check whether _exceptions is null without taking a lock because it is volatile
return Volatile.Read(ref _exceptions) != null;
}
}
/// <summary>Informs the block that it will not be receiving additional messages.</summary>
internal void Complete()
{
lock (ValueLock)
{
_decliningPermanently = true;
// CompleteAdding may be called in a context where an incoming lock is held. We need to
// call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
// However, we know that _decliningPermanently has been set, and thus the timing of
// CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
// and take the necessary locks in a situation where we're sure it won't cause a problem.
Task.Factory.StartNew(static state =>
{
var thisSourceCore = (SourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
{
lock (thisSourceCore.ValueLock)
{
thisSourceCore.CompleteBlockIfPossible();
}
}
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
/// <summary>Offers messages to all targets.</summary>
/// <param name="linkToTarget">
/// The newly linked target, if OfferToTargets is being called to synchronously
/// propagate to a target during a LinkTo operation.
/// </param>
private bool OfferToTargets(ITargetBlock<TOutput>? linkToTarget = null)
{
Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
Common.ContractAssertMonitorStatus(ValueLock, held: false);
// If the next message is reserved, we can't offer anything
if (_nextMessageReservedFor != null)
return false;
// Peek at the next message if there is one, so we can offer it.
DataflowMessageHeader header = default(DataflowMessageHeader);
TOutput? message;
bool offerJustToLinkToTarget = false;
// If offering isn't enabled and if we're not doing this as
// a result of LinkTo, bail. Otherwise, with offering disabled, we must have
// already offered this message to all existing targets, so we can just offer
// it to the newly linked target.
if (!Volatile.Read(ref _enableOffering))
{
if (linkToTarget == null) return false;
else offerJustToLinkToTarget = true;
}
// Otherwise, peek at message to offer
if (_messages.TryPeek(out message))
{
header = new DataflowMessageHeader(_nextMessageId.Value);
}
// If there is a message, offer it.
bool messageWasAccepted = false;
if (header.IsValid)
{
if (offerJustToLinkToTarget)
{
// If we've already offered the message to everyone else,
// we can just offer it to the newly linked target
Debug.Assert(linkToTarget != null, "Must have a valid target to offer to.");
OfferMessageToTarget(header, message!, linkToTarget, out messageWasAccepted);
}
else
{
// Otherwise, we've not yet offered this message to anyone, so even
// if linkToTarget is non-null, we need to propagate the message in order
// through all of the registered targets, the last of which will be the linkToTarget
// if it's non-null (no need to special-case it, though).
// Note that during OfferMessageToTarget, a target may call ConsumeMessage (taking advantage of the
// reentrancy of OutgoingLock), which may unlink the target if the target is registered as "unlinkAfterOne".
// Doing so will remove the target from the targets list. As such, we maintain the next node
// separately from cur.Next, in case cur.Next changes by cur being removed from the list.
// No other node in the list should change, as we're protected by OutgoingLock.
TargetRegistry<TOutput>.LinkedTargetInfo? cur = _targetRegistry.FirstTargetNode;
while (cur != null)
{
TargetRegistry<TOutput>.LinkedTargetInfo? next = cur.Next;
if (OfferMessageToTarget(header, message!, cur.Target, out messageWasAccepted)) break;
cur = next;
}
// If none of the targets accepted the message, disable offering.
if (!messageWasAccepted)
{
lock (ValueLock)
{
_enableOffering = false;
}
}
}
}
// If a message got accepted, consume it and reenable offering.
if (messageWasAccepted)
{
lock (ValueLock)
{
// SourceCore set consumeToAccept to false. However, it's possible
// that an incorrectly written target may ignore that parameter and synchronously consume
// even though they weren't supposed to. To recover from that,
// we'll only dequeue if the correct message is still at the head of the queue.
// However, we'll assert so that we can at least catch this in our own debug builds.
TOutput? dropped;
if (_nextMessageId.Value != header.Id ||
!_messages.TryDequeue(out dropped)) // remove the next message
{
Debug.Fail("The target did not follow the protocol.");
}
_nextMessageId.Value++;
// The message was accepted, so there's now going to be a new next message.
// If offering had been disabled, reenable it.
_enableOffering = true;
// Now that a message has been removed, we need to complete if possible or
// or asynchronously offer if necessary. However, if we're calling this as part of our
// offering loop, we won't be able to do either, since by definition there's already
// a processing task spun up (us) that would prevent these things. So we only
// do the checks if we're being called to link a new target rather than as part
// of normal processing.
if (linkToTarget != null)
{
CompleteBlockIfPossible();
OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
}
}
// Notify the owner block that our count has decreased
if (_itemsRemovedAction != null)
{
int count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, message!, null) : 1;
_itemsRemovedAction(_owningSource, count);
}
}
return messageWasAccepted;
}
/// <summary>Offers the message to the target.</summary>
/// <param name="header">The header of the message to offer.</param>
/// <param name="message">The message being offered.</param>
/// <param name="target">The single target to which the message should be offered.</param>
/// <param name="messageWasAccepted">true if the message was accepted by the target; otherwise, false.</param>
/// <returns>
/// true if the message should not be offered to additional targets;
/// false if propagation should be allowed to continue.
/// </returns>
private bool OfferMessageToTarget(
DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target,
out bool messageWasAccepted)
{
Debug.Assert(target != null, "Valid target to offer to is required.");
Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
Common.ContractAssertMonitorStatus(ValueLock, held: false);
DataflowMessageStatus result = target.OfferMessage(header, message, _owningSource, consumeToAccept: false);
Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages are not being offered concurrently, so nothing should be missed.");
messageWasAccepted = false;
// If accepted, note it, and if the target was linked as "once", remove it
if (result == DataflowMessageStatus.Accepted)
{
_targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
messageWasAccepted = true;
return true; // the message should not be offered to anyone else
}
// If declined permanently, remove the target
else if (result == DataflowMessageStatus.DecliningPermanently)
{
_targetRegistry.Remove(target);
}
// If the message was reserved by the target, stop propagating
else if (_nextMessageReservedFor != null)
{
Debug.Assert(result == DataflowMessageStatus.Postponed,
"If the message was reserved, it should also have been postponed.");
return true; // the message should not be offered to anyone else
}
// If the result was Declined, there's nothing more to be done.
// This message will sit at the front of the queue until someone claims it.
return false; // allow the message to be offered to someone else
}
/// <summary>
/// Called when we want to enable asynchronously offering message to targets.
/// Takes the ValueLock before delegating to OfferAsyncIfNecessary.
/// </summary>
private void OfferAsyncIfNecessaryWithValueLock()
{
lock (ValueLock)
{
OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: false);
}
}
/// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
/// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
private void OfferAsyncIfNecessary(bool isReplacementReplica, bool outgoingLockKnownAcquired)
{
Common.ContractAssertMonitorStatus(ValueLock, held: true);
// Fast path to enable OfferAsyncIfNecessary to be inlined. We only need
// to proceed if there's no task processing, offering is enabled, and
// there are no messages to be processed.
if (_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty)
{
// Slow path: do additional checks and potentially launch new task
OfferAsyncIfNecessary_Slow(isReplacementReplica, outgoingLockKnownAcquired);
}
}
/// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
/// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
private void OfferAsyncIfNecessary_Slow(bool isReplacementReplica, bool outgoingLockKnownAcquired)
{
Common.ContractAssertMonitorStatus(ValueLock, held: true);
Debug.Assert(_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty,
"The block must be enabled for offering, not currently be processing, and have messages available to process.");
// This method must not take the outgoing lock, as it will likely be called in situations
// where a derived type's incoming lock is held.
bool targetsAvailable = true;
if (outgoingLockKnownAcquired || Monitor.IsEntered(OutgoingLock))
{
Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
targetsAvailable = _targetRegistry.FirstTargetNode != null;
}
// If there's any work to be done...
if (targetsAvailable && !CanceledOrFaulted)
{
// Create task and store into _taskForOutputProcessing prior to scheduling the task
// so that _taskForOutputProcessing will be visibly set in the task loop.
_taskForOutputProcessing = new Task(static thisSourceCore => ((SourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
_owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
}
// Start the task handling scheduling exceptions
Exception? exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// First, log the exception while the processing state is dirty which is preventing the block from completing.
// Then revert the proactive processing state changes.
// And last, try to complete the block.
AddException(exception);
_taskForOutputProcessing = null;
_decliningPermanently = true;
// Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
// Re-take the locks on a separate thread.
Task.Factory.StartNew(static state =>
{
var thisSourceCore = (SourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
{
lock (thisSourceCore.ValueLock)
{
thisSourceCore.CompleteBlockIfPossible();
}
}
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>Task body used to process messages.</summary>
private void OfferMessagesLoopCore()
{
Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
"Must be part of the current processing task.");
try
{
int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
// We need to hold the outgoing lock while offering messages. We can either
// lock and unlock for each individual offering, or we can lock around multiple or all
// possible offerings. The former ensures that other operations don't get starved,
// while the latter is much more efficient (not continually acquiring and releasing
// the lock). For blocks that aren't linked to any targets, this won't matter
// (no offering is done), and for blocks that are only linked to targets, this shouldn't
// matter (no one is contending for the lock), thus
// the only case it would matter is when a block both has targets and is being
// explicitly received from, which is an uncommon scenario. Thus, we want to lock
// around the whole thing to improve performance, but just in case we do hit
// an uncommon scenario, in the default case we release the lock every now and again.
// If a developer wants to control this, they can limit the duration of the
// lock by using MaxMessagesPerTask.
const int DEFAULT_RELEASE_LOCK_ITERATIONS = 10; // Dialable
int releaseLockIterations =
_dataflowBlockOptions.MaxMessagesPerTask == DataflowBlockOptions.Unbounded ?
DEFAULT_RELEASE_LOCK_ITERATIONS : maxMessagesPerTask;
for (int messageCounter = 0;
messageCounter < maxMessagesPerTask && !CanceledOrFaulted;)
{
lock (OutgoingLock)
{
// While there are more messages to process, offer each in turn
// to the targets. If we're unable to propagate a particular message,
// stop trying until something changes in the future.
for (
int lockReleaseCounter = 0;
messageCounter < maxMessagesPerTask && lockReleaseCounter < releaseLockIterations && !CanceledOrFaulted;
++messageCounter, ++lockReleaseCounter)
{
if (!OfferToTargets()) return;
}
}
}
}
catch (Exception exc)
{
// Record the exception
AddException(exc);
// Notify the owning block it should stop accepting new messages
_completeAction(_owningSource);
}
finally
{
lock (OutgoingLock)
{
lock (ValueLock)
{
// We're no longer processing, so null out the processing task
Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
"Must be part of the current processing task.");
_taskForOutputProcessing = null;
Interlocked.MemoryBarrier(); // synchronize with AddMessage(s) and its read of _taskForOutputProcessing
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
OfferAsyncIfNecessary(isReplacementReplica: true, outgoingLockKnownAcquired: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteBlockIfPossible();
}
}
}
}
/// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
private bool CanceledOrFaulted
{
get
{
// Cancellation is honored as soon as the CancellationToken has been signaled.
// Faulting is honored after an exception has been encountered and the owning block
// has invoked Complete on us.
return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
(HasExceptions && _decliningPermanently);
}
}
/// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
private void CompleteBlockIfPossible()
{
Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
Common.ContractAssertMonitorStatus(ValueLock, held: true);
if (!_completionReserved)
{
if (_decliningPermanently && // declining permanently, so no more messages will arrive
_taskForOutputProcessing == null && // no current processing
_nextMessageReservedFor == null) // no pending reservation
{
CompleteBlockIfPossible_Slow();
}
}
}
/// <summary>
/// Slow path for CompleteBlockIfPossible.
/// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
/// </summary>
private void CompleteBlockIfPossible_Slow()
{
Debug.Assert(
_decliningPermanently && _taskForOutputProcessing == null && _nextMessageReservedFor == null,
"The block must be declining permanently, there must be no reservations, and there must be no processing tasks");
Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
Common.ContractAssertMonitorStatus(ValueLock, held: true);
if (_messages.IsEmpty || CanceledOrFaulted)
{
_completionReserved = true;
// Get out from under currently held locks. This is to avoid
// invoking synchronous continuations off of _completionTask.Task
// while holding a lock.
Task.Factory.StartNew(static state => ((SourceCore<TOutput>)state!).CompleteBlockOncePossible(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
/// <summary>
/// Completes the block. This must only be called once, and only once all of the completion conditions are met.
/// As such, it must only be called from CompleteBlockIfPossible.
/// </summary>
private void CompleteBlockOncePossible()
{
TargetRegistry<TOutput>.LinkedTargetInfo? linkedTargets;
List<Exception>? exceptions;
// Avoid completing while the code that caused this completion to occur is still holding a lock.
// Clear out the target registry and buffers to help avoid memory leaks.
lock (OutgoingLock)
{
// Save the linked list of targets so that it could be traversed later to propagate completion
linkedTargets = _targetRegistry.ClearEntryPoints();
lock (ValueLock)
{
_messages.Clear();
// Save a local reference to the exceptions list and null out the field,
// so that if the target side tries to add an exception this late,
// it will go to a separate list (that will be ignored.)
exceptions = _exceptions;
_exceptions = null;
}
}
// If it's due to an unhandled exception, finish in an error state
if (exceptions != null)
{
_completionTask.TrySetException(exceptions);
}
// If it's due to cancellation, finish in a canceled state
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
_completionTask.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
// Otherwise, finish in a successful state.
else
{
_completionTask.TrySetResult(default(VoidResult));
}
// Now that the completion task is completed, we may propagate completion to the linked targets
_targetRegistry.PropagateCompletion(linkedTargets);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCompleted(_owningSource);
}
}
/// <summary>Gets the object to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent
{
get
{
var displaySource = _owningSource as IDebuggerDisplay;
return $"Block = \"{(displaySource != null ? displaySource.Content : _owningSource)}\"";
}
}
/// <summary>Gets information about this helper to be used for display in a debugger.</summary>
/// <returns>Debugging information about this source core.</returns>
internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
/// <summary>Provides debugging information about the source core.</summary>
internal sealed class DebuggingInformation
{
/// <summary>The source being viewed.</summary>
private readonly SourceCore<TOutput> _source;
/// <summary>Initializes the type proxy.</summary>
/// <param name="source">The source being viewed.</param>
internal DebuggingInformation(SourceCore<TOutput> source) { _source = source; }
/// <summary>Gets the number of messages available for receiving.</summary>
internal int OutputCount { get { return _source._messages.Count; } }
/// <summary>Gets the messages available for receiving.</summary>
internal IEnumerable<TOutput> OutputQueue { get { return _source._messages.ToList(); } }
/// <summary>Gets the task being used for output processing.</summary>
internal Task? TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }
/// <summary>Gets whether the block is completed.</summary>
internal bool IsCompleted { get { return _source.Completion.IsCompleted; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
internal TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }
/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
internal ITargetBlock<TOutput>? NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }
}
}
}
|