|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SpscTargetCore.cs
//
//
// A fast single-producer-single-consumer core for a target block.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Security;
namespace System.Threading.Tasks.Dataflow.Internal
{
// SpscTargetCore provides a fast target core for use in blocks that will only have single-producer-single-consumer
// semantics. Blocks configured with the default DOP==1 will be single consumer, so whether this core may be
// used is largely up to whether the block is also single-producer. The ExecutionDataflowBlockOptions.SingleProducerConstrained
// option can be used by a developer to inform a block that it will only be accessed by one producer at a time,
// and a block like ActionBlock can utilize that knowledge to choose this target instead of the default TargetCore.
// However, there are further constraints that might prevent this core from being used.
// - If the user specifies a CancellationToken, this core can't be used, as the cancellation request
// could come in concurrently with the single producer accessing the block, thus resulting in multiple producers.
// - If the user specifies a bounding capacity, this core can't be used, as the consumer processing items
// needs to synchronize with producers around the change in bounding count, and the consumer is again
// in effect another producer.
// - If the block has a source half (e.g. TransformBlock) and that source could potentially call back
// to the target half to, for example, notify it of exceptions occurring, again there would potentially
// be multiple producers.
// Thus, when and how this SpscTargetCore may be applied is significantly constrained.
/// <summary>
/// Provides a core implementation of <see cref="ITargetBlock{TInput}"/> for use when there's only a single producer posting data.
/// </summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="TargetCore{TInput}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
internal sealed class SpscTargetCore<TInput>
{
/// <summary>The target block using this helper.</summary>
private readonly ITargetBlock<TInput> _owningTarget;
/// <summary>The messages in this target.</summary>
private readonly SingleProducerSingleConsumerQueue<TInput> _messages = new SingleProducerSingleConsumerQueue<TInput>();
/// <summary>The options to use to configure this block. The target core assumes these options are immutable.</summary>
private readonly ExecutionDataflowBlockOptions _dataflowBlockOptions;
/// <summary>An action to invoke for every accepted message.</summary>
private readonly Action<TInput> _action;
/// <summary>Exceptions that may have occurred and gone unhandled during processing. This field is lazily initialized.</summary>
private volatile List<Exception>? _exceptions;
/// <summary>Whether to stop accepting new messages.</summary>
private volatile bool _decliningPermanently;
/// <summary>A task has reserved the right to run the completion routine.</summary>
private volatile bool _completionReserved;
/// <summary>
/// The Task currently active to process the block. This field is used to synchronize between producer and consumer,
/// and it should not be set to null once the block completes, as doing so would allow for races where the producer
/// gets another consumer task queued even though the block has completed.
/// </summary>
private volatile Task? _activeConsumer;
/// <summary>A task representing the completion of the block. This field is lazily initialized.</summary>
private TaskCompletionSource<VoidResult>? _completionTask;
/// <summary>Initialize the SPSC target core.</summary>
/// <param name="owningTarget">The owning target block.</param>
/// <param name="action">The action to be invoked for every message.</param>
/// <param name="dataflowBlockOptions">The options to use to configure this block. The target core assumes these options are immutable.</param>
internal SpscTargetCore(
ITargetBlock<TInput> owningTarget, Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
Debug.Assert(owningTarget != null, "Expected non-null owningTarget");
Debug.Assert(action != null, "Expected non-null action");
Debug.Assert(dataflowBlockOptions != null, "Expected non-null dataflowBlockOptions");
_owningTarget = owningTarget;
_action = action;
_dataflowBlockOptions = dataflowBlockOptions;
}
internal bool Post(TInput messageValue)
{
if (_decliningPermanently)
return false;
// Store the offered message into the queue.
_messages.Enqueue(messageValue);
Interlocked.MemoryBarrier(); // ensure the read of _activeConsumer doesn't move up before the writes in Enqueue
// Make sure there's an active task available to handle processing this message. If we find the task
// is null, we'll try to schedule one using an interlocked operation. If we find the task is non-null,
// then there must be a task actively running. If there's a race where the task is about to complete
// and nulls out its reference (using a barrier), it'll subsequently check whether there are any messages in the queue,
// and since we put the messages into the queue before now, it'll find them and use an interlocked
// to re-launch itself.
if (_activeConsumer == null)
{
ScheduleConsumerIfNecessary(false);
}
return true;
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput>? source, bool consumeToAccept)
{
// If we're not required to go back to the source to consume the offered message, try fast path.
return !consumeToAccept && Post(messageValue) ?
DataflowMessageStatus.Accepted :
OfferMessage_Slow(messageHeader, messageValue, source, consumeToAccept);
}
/// <summary>Implements the slow path for OfferMessage.</summary>
/// <param name="messageHeader">The message header for the offered value.</param>
/// <param name="messageValue">The offered value.</param>
/// <param name="source">The source offering the message. This may be null.</param>
/// <param name="consumeToAccept">true if we need to call back to the source to consume the message; otherwise, false if we can simply accept it directly.</param>
/// <returns>The status of the message.</returns>
private DataflowMessageStatus OfferMessage_Slow(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput>? source, bool consumeToAccept)
{
// If we're declining permanently, let the caller know.
if (_decliningPermanently)
{
return DataflowMessageStatus.DecliningPermanently;
}
// If the message header is invalid, throw.
if (!messageHeader.IsValid)
{
throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
}
// If the caller has requested we consume the message using ConsumeMessage, do so.
if (consumeToAccept)
{
if (source == null) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, _owningTarget, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// See the "fast path" comments in Post
_messages.Enqueue(messageValue!);
Interlocked.MemoryBarrier(); // ensure the read of _activeConsumer doesn't move up before the writes in Enqueue
if (_activeConsumer == null)
{
ScheduleConsumerIfNecessary(isReplica: false);
}
return DataflowMessageStatus.Accepted;
}
/// <summary>Schedules a consumer task if there's none currently running.</summary>
/// <param name="isReplica">Whether the new consumer is being scheduled to replace a currently running consumer.</param>
private void ScheduleConsumerIfNecessary(bool isReplica)
{
// If there's currently no active task...
if (_activeConsumer == null)
{
// Create a new consumption task and try to set it as current as long as there's still no other task
var newConsumer = new Task(
static state => ((SpscTargetCore<TInput>)state!).ProcessMessagesLoopCore(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(isReplica));
if (Interlocked.CompareExchange(ref _activeConsumer, newConsumer, null) == null)
{
// We won the race. This task is now the consumer.
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
_owningTarget, newConsumer, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _messages.Count);
}
// Start the task. In the erroneous case where the scheduler throws an exception,
// just allow it to propagate. Our other option would be to fault the block with
// that exception, but in order for the block to complete we need to schedule a consumer
// task to do so, and it's very likely that if the scheduler is throwing an exception
// now, it would do so again.
newConsumer.Start(_dataflowBlockOptions.TaskScheduler);
}
}
}
/// <summary>Task body used to process messages.</summary>
private void ProcessMessagesLoopCore()
{
Debug.Assert(
_activeConsumer != null && _activeConsumer.Id == Task.CurrentId,
"This method should only be called when it's the active consumer.");
int messagesProcessed = 0;
int maxMessagesToProcess = _dataflowBlockOptions.ActualMaxMessagesPerTask;
// Continue processing as long as there's more processing to be done
bool continueProcessing = true;
while (continueProcessing)
{
continueProcessing = false;
TInput? nextMessage = default(TInput);
try
{
// While there are more messages to be processed, process each one.
// NOTE: This loop is critical for performance. It must be super lean.
while (
_exceptions == null &&
messagesProcessed < maxMessagesToProcess &&
_messages.TryDequeue(out nextMessage))
{
messagesProcessed++; // done before _action invoked in case it throws exception
_action(nextMessage);
}
}
catch (Exception exc)
{
// If the exception is for cancellation, just ignore it.
// Otherwise, store it, and the finally block will handle completion.
if (!Common.IsCooperativeCancellation(exc))
{
_decliningPermanently = true; // stop accepting from producers
Common.StoreDataflowMessageValueIntoExceptionData<TInput>(exc, nextMessage!, false);
StoreException(exc);
}
}
finally
{
// If more messages just arrived and we should still process them,
// loop back around and keep going.
if (!_messages.IsEmpty && _exceptions == null && (messagesProcessed < maxMessagesToProcess))
{
continueProcessing = true;
}
else
{
// If messages are being declined and we're empty, or if there's an exception,
// then there's no more work to be done and we should complete the block.
bool wasDecliningPermanently = _decliningPermanently;
if ((wasDecliningPermanently && _messages.IsEmpty) || _exceptions != null)
{
// Complete the block, as long as we're not already completing or completed.
if (!_completionReserved) // no synchronization necessary; this can't happen concurrently
{
_completionReserved = true;
CompleteBlockOncePossible();
}
}
else
{
// Mark that we're exiting.
Task? previousConsumer = Interlocked.Exchange<Task?>(ref _activeConsumer, null);
Debug.Assert(previousConsumer != null && previousConsumer.Id == Task.CurrentId,
"The running task should have been denoted as the active task.");
// Now that we're no longer the active task, double
// check to make sure there's really nothing to do,
// which could include processing more messages or completing.
// If there is more to do, schedule a task to try to do it.
// This is to handle a race with Post/Complete/Fault and this
// task completing.
if (!_messages.IsEmpty || // messages to be processed
(!wasDecliningPermanently && _decliningPermanently) || // potentially completion to be processed
_exceptions != null) // exceptions/completion to be processed
{
ScheduleConsumerIfNecessary(isReplica: true);
}
}
}
}
}
}
/// <summary>Gets the number of messages waiting to be processed.</summary>
internal int InputCount { get { return _messages.Count; } }
/// <summary>
/// Completes the target core. If an exception is provided, the block will end up in a faulted state.
/// If Complete is invoked more than once, or if it's invoked after the block is already
/// completing, all invocations after the first are ignored.
/// </summary>
/// <param name="exception">The exception to be stored.</param>
internal void Complete(Exception? exception)
{
// If we're not yet declining permanently...
if (!_decliningPermanently)
{
// Mark us as declining permanently, and then kick off a processing task
// if we need one. It's this processing task's job to complete the block
// once all data has been consumed and/or we're in a valid state for completion.
if (exception != null) StoreException(exception);
_decliningPermanently = true;
ScheduleConsumerIfNecessary(isReplica: false);
}
}
/// <summary>
/// Ensures the exceptions list is initialized and stores the exception into the list using a lock.
/// </summary>
/// <param name="exception">The exception to store.</param>
private void StoreException(Exception exception)
{
// Ensure that the _exceptions field has been initialized.
// We need to synchronize the initialization and storing of
// the exception because this method could be accessed concurrently
// by the producer and consumer, a producer calling Fault and the
// processing task processing the user delegate which might throw.
#pragma warning disable 0420
lock (LazyInitializer.EnsureInitialized(ref _exceptions, static () => new List<Exception>()))
#pragma warning restore 0420
{
_exceptions.Add(exception);
}
}
/// <summary>
/// Completes the block. This must only be called once, and only once all of the completion conditions are met.
/// </summary>
private void CompleteBlockOncePossible()
{
Debug.Assert(_completionReserved, "Should only invoke once completion has been reserved.");
// Dump any messages that might remain in the queue, which could happen if we completed due to exceptions.
while (_messages.TryDequeue(out _)) ;
// Complete the completion task
bool result;
if (_exceptions != null)
{
Exception[] exceptions;
lock (_exceptions) exceptions = _exceptions.ToArray();
result = CompletionSource.TrySetException(exceptions);
}
else
{
result = CompletionSource.TrySetResult(default(VoidResult));
}
Debug.Assert(result, "Expected completion task to not yet be completed");
// We explicitly do not set the _activeTask to null here, as that would
// allow for races where a producer calling OfferMessage could end up
// seeing _activeTask as null and queueing a new consumer task even
// though the block has completed.
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCompleted(_owningTarget);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
internal Task Completion { get { return CompletionSource.Task; } }
/// <summary>Gets the lazily-initialized completion source.</summary>
private TaskCompletionSource<VoidResult> CompletionSource
{
get { return LazyInitializer.EnsureInitialized(ref _completionTask, static () => new TaskCompletionSource<VoidResult>()); }
}
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }
/// <summary>Gets information about this helper to be used for display in a debugger.</summary>
/// <returns>Debugging information about this target.</returns>
internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
/// <summary>Gets the object to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent
{
get
{
var displayTarget = _owningTarget as IDebuggerDisplay;
return $"Block = \"{(displayTarget != null ? displayTarget.Content : _owningTarget)}\"";
}
}
/// <summary>Provides a wrapper for commonly needed debugging information.</summary>
internal sealed class DebuggingInformation
{
/// <summary>The target being viewed.</summary>
private readonly SpscTargetCore<TInput> _target;
/// <summary>Initializes the debugging helper.</summary>
/// <param name="target">The target being viewed.</param>
internal DebuggingInformation(SpscTargetCore<TInput> target) { _target = target; }
/// <summary>Gets the messages waiting to be processed.</summary>
internal IEnumerable<TInput> InputQueue { get { return _target._messages.ToList(); } }
/// <summary>Gets the current number of outstanding input processing operations.</summary>
internal int CurrentDegreeOfParallelism { get { return _target._activeConsumer != null && !_target.Completion.IsCompleted ? 1 : 0; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
internal ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
internal bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
internal bool IsCompleted { get { return _target.Completion.IsCompleted; } }
}
}
}
|