File: Internal\Common.cs
Web Access
Project: src\src\libraries\System.Threading.Tasks.Dataflow\src\System.Threading.Tasks.Dataflow.csproj (System.Threading.Tasks.Dataflow)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// Common.cs
//
//
// Helper routines for the rest of the TPL Dataflow implementation.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.ExceptionServices;
 
namespace System.Threading.Tasks.Dataflow.Internal
{
    /// <summary>Internal helper utilities.</summary>
    internal static class Common
    {
        /// <summary>
        /// An invalid ID to assign for reordering purposes.  This value is chosen to be the last of the 64-bit integers that
        /// could ever be assigned as a reordering ID.
        /// </summary>
        internal const long INVALID_REORDERING_ID = -1;
        /// <summary>A well-known message ID for code that will send exactly one message or
        /// where the exact message ID is not important.</summary>
        internal const int SINGLE_MESSAGE_ID = 1;
        /// <summary>A perf optimization for caching a well-known message header instead of
        /// constructing one every time it is needed.</summary>
        internal static readonly DataflowMessageHeader SingleMessageHeader = new DataflowMessageHeader(SINGLE_MESSAGE_ID);
        /// <summary>The cached completed Task{bool} with a result of true.</summary>
        internal static readonly Task<bool> CompletedTaskWithTrueResult = CreateCachedBooleanTask(true);
        /// <summary>The cached completed Task{bool} with a result of false.</summary>
        internal static readonly Task<bool> CompletedTaskWithFalseResult = CreateCachedBooleanTask(false);
        /// <summary>The cached completed TaskCompletionSource{VoidResult}.</summary>
        internal static readonly TaskCompletionSource<VoidResult> CompletedVoidResultTaskCompletionSource = CreateCachedTaskCompletionSource<VoidResult>();
 
        /// <summary>Asserts that a given synchronization object is either held or not held.</summary>
        /// <param name="syncObj">The monitor to check.</param>
        /// <param name="held">Whether we want to assert that it's currently held or not held.</param>
        [Conditional("DEBUG")]
        internal static void ContractAssertMonitorStatus(object syncObj, bool held)
        {
            Debug.Assert(syncObj != null, "The monitor object to check must be provided.");
            Debug.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed.");
        }
 
        /// <summary>Keeping alive processing tasks: maximum number of processed messages.</summary>
        internal const int KEEP_ALIVE_NUMBER_OF_MESSAGES_THRESHOLD = 1;
        /// <summary>Keeping alive processing tasks: do not attempt this many times.</summary>
        internal const int KEEP_ALIVE_BAN_COUNT = 1000;
 
        /// <summary>A predicate type for TryKeepAliveUntil.</summary>
        /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
        /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
        /// <returns>The state of the predicate.</returns>
        internal delegate bool KeepAlivePredicate<TStateIn, TStateOut>(TStateIn stateIn, out TStateOut stateOut);
 
        /// <summary>Actively waits for a predicate to become true.</summary>
        /// <param name="predicate">The predicate to become true.</param>
        /// <param name="stateIn">Input state for the predicate in order to avoid closure allocations.</param>
        /// <param name="stateOut">Output state for the predicate in order to avoid closure allocations.</param>
        /// <returns>True if the predicate was evaluated and it returned true. False otherwise.</returns>
        internal static bool TryKeepAliveUntil<TStateIn, TStateOut>(KeepAlivePredicate<TStateIn, TStateOut> predicate,
                                                                    TStateIn stateIn, [MaybeNullWhen(false)] out TStateOut stateOut)
        {
            Debug.Assert(predicate != null, "Non-null predicate to execute is required.");
            const int ITERATION_LIMIT = 16;
 
            for (int c = ITERATION_LIMIT; c > 0; c--)
            {
                if (!Thread.Yield())
                {
                    // There was no other thread waiting.
                    // We may spend some more cycles to evaluate the predicate.
                    if (predicate(stateIn, out stateOut)) return true;
                }
            }
 
            stateOut = default(TStateOut);
            return false;
        }
 
        /// <summary>Unwraps an instance T from object state that is a WeakReference to that instance.</summary>
        /// <typeparam name="T">The type of the data to be unwrapped.</typeparam>
        /// <param name="state">The weak reference.</param>
        /// <returns>The T instance.</returns>
        internal static T? UnwrapWeakReference<T>(object state) where T : class
        {
            var wr = state as WeakReference<T>;
            Debug.Assert(wr != null, "Expected a WeakReference<T> as the state argument");
            return wr.TryGetTarget(out T? item) ? item : null;
        }
 
        /// <summary>Gets an ID for the dataflow block.</summary>
        /// <param name="block">The dataflow block.</param>
        /// <returns>An ID for the dataflow block.</returns>
        internal static int GetBlockId(IDataflowBlock block)
        {
            Debug.Assert(block != null, "Block required to extract an Id.");
            const int NOTASKID = 0; // tasks don't have 0 as ids
            Task? t = Common.GetPotentiallyNotSupportedCompletionTask(block);
            return t != null ? t.Id : NOTASKID;
        }
 
        /// <summary>Gets the name for the specified block, suitable to be rendered in a debugger window.</summary>
        /// <param name="block">The block for which a name is needed.</param>
        /// <param name="options">
        /// The options to use when rendering the name. If no options are provided, the block's name is used directly.
        /// </param>
        /// <returns>The name of the object.</returns>
        /// <remarks>This is used from DebuggerDisplay attributes.</remarks>
        internal static string GetNameForDebugger(
            IDataflowBlock block, DataflowBlockOptions? options = null)
        {
            Debug.Assert(block != null, "Should only be used with valid objects being displayed in the debugger.");
            Debug.Assert(options == null || options.NameFormat != null, "If options are provided, NameFormat must be valid.");
 
            if (block == null) return string.Empty;
 
            string blockName = block.GetType().Name;
            if (options == null) return blockName;
 
            // {0} == block name
            // {1} == block id
            int blockId = GetBlockId(block);
 
            // Since NameFormat is public, formatting may throw if the user has set
            // a string that contains a reference to an argument higher than {1}.
            // In the case of an exception, show the exception message.
            try
            {
                return string.Format(options.NameFormat, blockName, blockId);
            }
            catch (Exception exception)
            {
                return exception.Message;
            }
        }
 
        /// <summary>
        /// Gets whether the exception represents a cooperative cancellation acknowledgment.
        /// </summary>
        /// <param name="exception">The exception to check.</param>
        /// <returns>true if this exception represents a cooperative cancellation acknowledgment; otherwise, false.</returns>
        internal static bool IsCooperativeCancellation(Exception exception)
        {
            Debug.Assert(exception != null, "An exception to check for cancellation must be provided.");
            return exception is OperationCanceledException;
            // Note that the behavior of this method does not exactly match that of Parallel.*, PLINQ, and Task.Factory.StartNew,
            // in that it's more liberal and treats any OCE as acknowledgment of cancellation; in contrast, the other
            // libraries only treat OCEs as such if they contain the same token that was provided by the user
            // and if that token has cancellation requested.  Such logic could be achieved here with:
            //   var oce = exception as OperationCanceledException;
            //   return oce != null &&
            //          oce.CancellationToken == dataflowBlockOptions.CancellationToken &&
            //          oce.CancellationToken.IsCancellationRequested;
            // However, that leads to a discrepancy with the async processing case of dataflow blocks,
            // where tasks are returned to represent the message processing, potentially in the Canceled state,
            // and we simply ignore such tasks.  Further, for blocks like TransformBlock, it's useful to be able
            // to cancel an individual operation which must return a TOutput value, simply by throwing an OperationCanceledException.
            // In such cases, you wouldn't want cancellation tied to the token, because you would only be able to
            // cancel an individual message processing if the whole block was canceled.
        }
 
        /// <summary>Registers a block for cancellation by completing when cancellation is requested.</summary>
        /// <param name="cancellationToken">The block's cancellation token.</param>
        /// <param name="completionTask">The task that will complete when the block is completely done processing.</param>
        /// <param name="completeAction">An action that will decline permanently on the state passed to it.</param>
        /// <param name="completeState">The block on which to decline permanently.</param>
        internal static void WireCancellationToComplete(
            CancellationToken cancellationToken, Task completionTask, Action<object?, CancellationToken> completeAction, object completeState)
        {
            Debug.Assert(completionTask != null, "A task to wire up for completion is needed.");
            Debug.Assert(completeAction != null, "An action to invoke upon cancellation is required.");
 
            if (cancellationToken.IsCancellationRequested)
            {
                // If a cancellation request has already occurred, just invoke the declining action synchronously.
                // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
                completeAction(completeState, cancellationToken);
            }
            else if (cancellationToken.CanBeCanceled)
            {
                // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
                // data, and we also want to dispose of that registration when we complete so that we don't
                // leak into a long-living cancellation token.
                CancellationTokenRegistration reg = cancellationToken.Register(
#if NET6_0_OR_GREATER
                    completeAction, completeState
#else
                    state =>
                    {
                        var tuple = (Tuple<Action<object?, CancellationToken>, object, CancellationToken>)state!;
                        tuple.Item1(tuple.Item2, tuple.Item3);
                    },
                    Tuple.Create(completeAction, completeState, cancellationToken)
#endif
                    );
                completionTask.ContinueWith(static (completed, state) => ((CancellationTokenRegistration)state!).Dispose(),
                    reg, cancellationToken, Common.GetContinuationOptions(), TaskScheduler.Default);
            }
        }
 
        /// <summary>Initializes the stack trace and watson bucket of an inactive exception.</summary>
        /// <param name="exception">The exception to initialize.</param>
        /// <returns>The initialized exception.</returns>
        internal static Exception InitializeStackTrace(Exception exception)
        {
            Debug.Assert(exception != null && exception.StackTrace == null,
                "A valid but uninitialized exception should be provided.");
            try { throw exception; }
            catch { return exception; }
        }
 
        /// <summary>The name of the key in an Exception's Data collection used to store information on a dataflow message.</summary>
        internal const string EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE = "DataflowMessageValue"; // should not be localized
 
        /// <summary>Stores details on a dataflow message into an Exception's Data collection.</summary>
        /// <typeparam name="T">Specifies the type of data stored in the message.</typeparam>
        /// <param name="exc">The Exception whose Data collection should store message information.</param>
        /// <param name="messageValue">The message information to be stored.</param>
        /// <param name="targetInnerExceptions">Whether to store the data into the exception's inner exception(s) in addition to the exception itself.</param>
        internal static void StoreDataflowMessageValueIntoExceptionData<T>(Exception exc, T messageValue, bool targetInnerExceptions = false)
        {
            Debug.Assert(exc != null, "The exception into which data should be stored must be provided.");
 
            // Get the string value to store
            string? strValue = messageValue as string;
            if (strValue == null && messageValue != null)
            {
                try
                {
                    strValue = messageValue.ToString();
                }
                catch { /* It's ok to eat all exceptions here.  If ToString throws, we'll just ignore it. */ }
            }
            if (strValue == null) return;
 
            // Store the data into the exception itself
            StoreStringIntoExceptionData(exc, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
 
            // If we also want to target inner exceptions...
            if (targetInnerExceptions)
            {
                // If this is an aggregate, store into all inner exceptions.
                var aggregate = exc as AggregateException;
                if (aggregate != null)
                {
                    foreach (Exception innerException in aggregate.InnerExceptions)
                    {
                        StoreStringIntoExceptionData(innerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
                    }
                }
                // Otherwise, if there's an Exception.InnerException, store into that.
                else if (exc.InnerException != null)
                {
                    StoreStringIntoExceptionData(exc.InnerException, Common.EXCEPTIONDATAKEY_DATAFLOWMESSAGEVALUE, strValue);
                }
            }
        }
 
        /// <summary>Stores the specified string value into the specified key slot of the specified exception's data dictionary.</summary>
        /// <param name="exception">The exception into which the key/value should be stored.</param>
        /// <param name="key">The key.</param>
        /// <param name="value">The value to be serialized as a string and stored.</param>
        /// <remarks>If the key is already present in the exception's data dictionary, the value is not overwritten.</remarks>
        private static void StoreStringIntoExceptionData(Exception exception, string key, string value)
        {
            Debug.Assert(exception != null, "An exception is needed to store the data into.");
            Debug.Assert(key != null, "A key into the exception's data collection is needed.");
            Debug.Assert(value != null, "The value to store must be provided.");
            try
            {
                IDictionary data = exception.Data;
                if (data != null && !data.IsFixedSize && !data.IsReadOnly && data[key] == null)
                {
                    data[key] = value;
                }
            }
            catch
            {
                // It's ok to eat all exceptions here.  This could throw if an Exception type
                // has overridden Data to behave differently than we expect.
            }
        }
 
        /// <summary>Throws an exception asynchronously on the thread pool.</summary>
        /// <param name="error">The exception to throw.</param>
        /// <remarks>
        /// This function is used when an exception needs to be propagated from a thread
        /// other than the current context.  This could happen, for example, if the exception
        /// should cause the standard CLR exception escalation behavior, but we're inside
        /// of a task that will squirrel the exception away.
        /// </remarks>
        internal static void ThrowAsync(Exception error)
        {
            ExceptionDispatchInfo edi = ExceptionDispatchInfo.Capture(error);
            ThreadPool.QueueUserWorkItem(static state => { ((ExceptionDispatchInfo)state!).Throw(); }, edi);
        }
 
        /// <summary>Adds the exception to the list, first initializing the list if the list is null.</summary>
        /// <param name="list">The list to add the exception to, and initialize if null.</param>
        /// <param name="exception">The exception to add or whose inner exception(s) should be added.</param>
        /// <param name="unwrapInnerExceptions">Unwrap and add the inner exception(s) rather than the specified exception directly.</param>
        /// <remarks>This method is not thread-safe, in that it manipulates <paramref name="list"/> without any synchronization.</remarks>
        internal static void AddException([NotNull] ref List<Exception>? list, Exception exception, bool unwrapInnerExceptions = false)
        {
            Debug.Assert(exception != null, "An exception to add is required.");
            Debug.Assert(!unwrapInnerExceptions || exception.InnerException != null,
                "If unwrapping is requested, an inner exception is required.");
 
            // Make sure the list of exceptions is initialized (lazily).
            list ??= new List<Exception>();
 
            if (unwrapInnerExceptions)
            {
                AggregateException? aggregate = exception as AggregateException;
                if (aggregate != null)
                {
                    list.AddRange(aggregate.InnerExceptions);
                }
                else
                {
                    list.Add(exception.InnerException!);
                }
            }
            else list.Add(exception);
        }
 
        /// <summary>Creates a task we can cache for the desired Boolean result.</summary>
        /// <param name="value">The value of the Boolean.</param>
        /// <returns>A task that may be cached.</returns>
        private static Task<bool> CreateCachedBooleanTask(bool value)
        {
            // AsyncTaskMethodBuilder<Boolean> caches tasks that are non-disposable.
            // By using these same tasks, we're a bit more robust against disposals,
            // in that such a disposed task's ((IAsyncResult)task).AsyncWaitHandle
            // is still valid.
            var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<bool>.Create();
            atmb.SetResult(value);
            return atmb.Task; // must be accessed after SetResult to get the cached task
        }
 
        /// <summary>Creates a TaskCompletionSource{T} completed with a value of default(T) that we can cache.</summary>
        /// <returns>Completed TaskCompletionSource{T} that may be cached.</returns>
        private static TaskCompletionSource<T> CreateCachedTaskCompletionSource<T>()
        {
            var tcs = new TaskCompletionSource<T>();
            tcs.SetResult(default(T)!);
            return tcs;
        }
 
        /// <summary>Creates a task faulted with the specified exception.</summary>
        /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
        /// <param name="exception">The exception with which to complete the task.</param>
        /// <returns>The faulted task.</returns>
        internal static Task<TResult> CreateTaskFromException<TResult>(Exception exception)
        {
            var atmb = System.Runtime.CompilerServices.AsyncTaskMethodBuilder<TResult>.Create();
            atmb.SetException(exception);
            return atmb.Task;
        }
 
        /// <summary>Creates a task canceled with the specified cancellation token.</summary>
        /// <typeparam name="TResult">Specifies the type of the result for this task.</typeparam>
        /// <returns>The canceled task.</returns>
        internal static Task<TResult> CreateTaskFromCancellation<TResult>(CancellationToken cancellationToken)
        {
            Debug.Assert(cancellationToken.IsCancellationRequested,
                "The task will only be immediately canceled if the token has cancellation requested already.");
            var t = new Task<TResult>(CachedGenericDelegates<TResult>.DefaultTResultFunc, cancellationToken);
            Debug.Assert(t.IsCanceled, "Task's constructor should cancel the task synchronously in the ctor.");
            return t;
        }
 
        /// <summary>Gets the completion task of a block, and protects against common cases of the completion task not being implemented or supported.</summary>
        /// <param name="block">The block.</param>
        /// <returns>The completion task, or null if the block's completion task is not implemented or supported.</returns>
        internal static Task? GetPotentiallyNotSupportedCompletionTask(IDataflowBlock block)
        {
            Debug.Assert(block != null, "We need a block from which to retrieve a cancellation task.");
            try
            {
                return block.Completion;
            }
            catch (NotImplementedException) { }
            catch (NotSupportedException) { }
            return null;
        }
 
        /// <summary>
        /// Creates an IDisposable that, when disposed, will acquire the outgoing lock while removing
        /// the target block from the target registry.
        /// </summary>
        /// <typeparam name="TOutput">Specifies the type of data in the block.</typeparam>
        /// <param name="outgoingLock">The outgoing lock used to protect the target registry.</param>
        /// <param name="targetRegistry">The target registry from which the target should be removed.</param>
        /// <param name="targetBlock">The target to remove from the registry.</param>
        /// <returns>An IDisposable that will unregister the target block from the registry while holding the outgoing lock.</returns>
        internal static IDisposable CreateUnlinker<TOutput>(object outgoingLock, TargetRegistry<TOutput> targetRegistry, ITargetBlock<TOutput> targetBlock)
        {
            Debug.Assert(outgoingLock != null, "Monitor object needed to protect the operation.");
            Debug.Assert(targetRegistry != null, "Registry from which to remove is required.");
            Debug.Assert(targetBlock != null, "Target block to unlink is required.");
            return Disposables.Create(CachedGenericDelegates<TOutput>.CreateUnlinkerShimAction,
                outgoingLock, targetRegistry, targetBlock);
        }
 
        /// <summary>An infinite TimeSpan.</summary>
        internal static readonly TimeSpan InfiniteTimeSpan = Timeout.InfiniteTimeSpan;
 
        /// <summary>Validates that a timeout either is -1 or is non-negative and within the range of an Int32.</summary>
        /// <param name="timeout">The timeout to validate.</param>
        /// <returns>true if the timeout is valid; otherwise, false.</returns>
        internal static bool IsValidTimeout(TimeSpan timeout)
        {
            long millisecondsTimeout = (long)timeout.TotalMilliseconds;
            return millisecondsTimeout >= Timeout.Infinite && millisecondsTimeout <= int.MaxValue;
        }
 
        /// <summary>Gets the options to use for continuation tasks.</summary>
        /// <param name="toInclude">Any options to include in the result.</param>
        /// <returns>The options to use.</returns>
        internal static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude = TaskContinuationOptions.None)
        {
            return toInclude | TaskContinuationOptions.DenyChildAttach;
        }
 
        /// <summary>Gets the options to use for tasks.</summary>
        /// <param name="isReplacementReplica">If this task is being created to replace another.</param>
        /// <remarks>
        /// These options should be used for all tasks that have the potential to run user code or
        /// that are repeatedly spawned and thus need a modicum of fair treatment.
        /// </remarks>
        /// <returns>The options to use.</returns>
        internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
        {
            TaskCreationOptions options = TaskCreationOptions.DenyChildAttach;
            if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
            return options;
        }
 
        /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
        /// <param name="task">Task to be started.</param>
        /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
        /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
        internal static Exception? StartTaskSafe(Task task, TaskScheduler scheduler)
        {
            Debug.Assert(task != null, "Task to start is required.");
            Debug.Assert(scheduler != null, "Scheduler on which to start the task is required.");
 
            if (scheduler == TaskScheduler.Default)
            {
                task.Start(scheduler);
                return null; // We don't need to worry about scheduler exceptions from the default scheduler.
            }
            // Slow path with try/catch separated out so that StartTaskSafe may be inlined in the common case.
            else return StartTaskSafeCore(task, scheduler);
        }
 
        /// <summary>Starts an already constructed task with handling and observing exceptions that may come from the scheduling process.</summary>
        /// <param name="task">Task to be started.</param>
        /// <param name="scheduler">TaskScheduler to schedule the task on.</param>
        /// <returns>null on success, an exception reference on scheduling error. In the latter case, the task reference is nulled out.</returns>
        private static Exception? StartTaskSafeCore(Task task, TaskScheduler scheduler)
        {
            Debug.Assert(task != null, "Task to start is needed.");
            Debug.Assert(scheduler != null, "Scheduler on which to start the task is required.");
 
            Exception? schedulingException = null;
 
            try
            {
                task.Start(scheduler);
            }
            catch (Exception caughtException)
            {
                // Verify TPL has faulted the task
                Debug.Assert(task.IsFaulted, "The task should have been faulted if it failed to start.");
 
                // Observe the task's exception
                _ = task.Exception;
 
                schedulingException = caughtException;
            }
 
            return schedulingException;
        }
 
        /// <summary>Pops and explicitly releases postponed messages after the block is done with processing.</summary>
        /// <remarks>No locks should be held at this time. Unfortunately we cannot assert that.</remarks>
        internal static void ReleaseAllPostponedMessages<T>(ITargetBlock<T> target,
                                    QueuedMap<ISourceBlock<T>, DataflowMessageHeader> postponedMessages,
                                    ref List<Exception>? exceptions)
        {
            Debug.Assert(target != null, "There must be a subject target.");
            Debug.Assert(postponedMessages != null, "The stacked map of postponed messages must exist.");
 
            // Note that we don't synchronize on lockObject for postponedMessages here,
            // because no one should be adding to it at this time.  We do a bit of
            // checking just for sanity's sake.
            int initialCount = postponedMessages.Count;
            int processedCount = 0;
 
            KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
            while (postponedMessages.TryPop(out sourceAndMessage))
            {
                // Loop through all postponed messages declining each messages.
                // The only way we have to do this is by reserving and then immediately releasing each message.
                // This is important for sources like SendAsyncSource, which keep state around until
                // they get a response to a postponed message.
                try
                {
                    Debug.Assert(sourceAndMessage.Key != null, "Postponed messages must have an associated source.");
                    if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, target))
                    {
                        sourceAndMessage.Key.ReleaseReservation(sourceAndMessage.Value, target);
                    }
                }
                catch (Exception exc)
                {
                    Common.AddException(ref exceptions, exc);
                }
 
                processedCount++;
            }
 
            Debug.Assert(processedCount == initialCount,
                "We should have processed the exact number of elements that were initially there.");
        }
 
        /// <summary>Cache ThrowAsync to avoid allocations when it is passed into PropagateCompletionXxx.</summary>
        internal static readonly Action<Exception> AsyncExceptionHandler = ThrowAsync;
 
        /// <summary>
        /// Propagates completion of sourceCompletionTask to target synchronously.
        /// </summary>
        /// <param name="sourceCompletionTask">The task whose completion is to be propagated. It must be completed.</param>
        /// <param name="target">The block where completion is propagated.</param>
        /// <param name="exceptionHandler">Handler for exceptions from the target. May be null which would propagate the exception to the caller.</param>
        internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target, Action<Exception>? exceptionHandler)
        {
            Debug.Assert(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
            Debug.Assert(target != null, "The target where completion is to be propagated may not be null.");
            Debug.Assert(sourceCompletionTask.IsCompleted, "sourceCompletionTask must be completed in order to propagate its completion.");
 
            AggregateException? exception = sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;
 
            try
            {
                if (exception != null) target.Fault(exception);
                else target.Complete();
            }
            catch (Exception exc)
            {
                if (exceptionHandler != null) exceptionHandler(exc);
                else throw;
            }
        }
 
        /// <summary>
        /// Creates a continuation off sourceCompletionTask to complete target. See PropagateCompletion.
        /// </summary>
        private static void PropagateCompletionAsContinuation(Task sourceCompletionTask, IDataflowBlock target)
        {
            Debug.Assert(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
            Debug.Assert(target != null, "The target where completion is to be propagated may not be null.");
            sourceCompletionTask.ContinueWith(static (task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state!, AsyncExceptionHandler),
                target, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
        }
 
        /// <summary>
        /// Propagates completion of sourceCompletionTask to target based on sourceCompletionTask's current state. See PropagateCompletion.
        /// </summary>
        internal static void PropagateCompletionOnceCompleted(Task sourceCompletionTask, IDataflowBlock target)
        {
            Debug.Assert(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
            Debug.Assert(target != null, "The target where completion is to be propagated may not be null.");
 
            // If sourceCompletionTask is completed, propagate completion synchronously.
            // Otherwise hook up a continuation.
            if (sourceCompletionTask.IsCompleted) PropagateCompletion(sourceCompletionTask, target, exceptionHandler: null);
            else PropagateCompletionAsContinuation(sourceCompletionTask, target);
        }
 
        /// <summary>Static class used to cache generic delegates the C# compiler doesn't cache by default.</summary>
        /// <remarks>Without this, we end up allocating the generic delegate each time the operation is used.</remarks>
        private static class CachedGenericDelegates<T>
        {
            /// <summary>A function that returns the default value of T.</summary>
            internal static readonly Func<T> DefaultTResultFunc = static () => default(T)!;
            /// <summary>
            /// A function to use as the body of ActionOnDispose in CreateUnlinkerShim.
            /// Passed a tuple of the sync obj, the target registry, and the target block as the state parameter.
            /// </summary>
            internal static readonly Action<object, TargetRegistry<T>, ITargetBlock<T>> CreateUnlinkerShimAction =
                (syncObj, registry, target) =>
            {
                lock (syncObj) registry.Remove(target);
            };
        }
    }
 
    /// <summary>State used only when bounding.</summary>
    [DebuggerDisplay("BoundedCapacity = {BoundedCapacity}}")]
    internal class BoundingState
    {
        /// <summary>The maximum number of messages allowed to be buffered.</summary>
        internal readonly int BoundedCapacity;
        /// <summary>The number of messages currently stored.</summary>
        /// <remarks>
        /// This value may temporarily be higher than the actual number stored.
        /// That's ok, we just can't accept any new messages if CurrentCount >= BoundedCapacity.
        /// Worst case is that we may temporarily have fewer items in the block than our maximum allows,
        /// but we'll never have more.
        /// </remarks>
        internal int CurrentCount;
 
        /// <summary>Initializes the BoundingState.</summary>
        /// <param name="boundedCapacity">The positive bounded capacity.</param>
        internal BoundingState(int boundedCapacity)
        {
            Debug.Assert(boundedCapacity > 0, "Bounded is only supported with positive values.");
            BoundedCapacity = boundedCapacity;
        }
 
        /// <summary>Gets whether there's room available to add another message.</summary>
        internal bool CountIsLessThanBound { get { return CurrentCount < BoundedCapacity; } }
    }
 
    /// <summary>Stated used only when bounding and when postponed messages are stored.</summary>
    /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
    [DebuggerDisplay("BoundedCapacity = {BoundedCapacity}, PostponedMessages = {PostponedMessagesCountForDebugger}")]
    internal class BoundingStateWithPostponed<TInput> : BoundingState
    {
        /// <summary>Queue of postponed messages.</summary>
        internal readonly QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages =
            new QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>();
        /// <summary>
        /// The number of transfers from the postponement queue to the input queue currently being processed.
        /// </summary>
        /// <remarks>
        /// Blocks that use TargetCore need to transfer messages from the postponed queue to the input messages
        /// queue.  While doing that, new incoming messages may arrive, and if they view the postponed queue
        /// as being empty (after the block has removed the last postponed message and is consuming it, before
        /// storing it into the input queue), they might go directly into the input queue... that will then mess
        /// up the ordering between those postponed messages and the newly incoming messages.  To address that,
        /// OutstandingTransfers is used to track the number of transfers currently in progress.  Incoming
        /// messages must be postponed not only if there are already any postponed messages, but also if
        /// there are any transfers in progress (i.e. this value is > 0).  It's an integer because the DOP could
        /// be greater than 1, and thus we need to ref count multiple transfers that might be in progress.
        /// </remarks>
        internal int OutstandingTransfers;
 
        /// <summary>Initializes the BoundingState.</summary>
        /// <param name="boundedCapacity">The positive bounded capacity.</param>
        internal BoundingStateWithPostponed(int boundedCapacity) : base(boundedCapacity)
        {
        }
 
        /// <summary>Gets the number of postponed messages for the debugger.</summary>
        private int PostponedMessagesCountForDebugger { get { return PostponedMessages.Count; } }
    }
 
    /// <summary>Stated used only when bounding and when postponed messages and a task are stored.</summary>
    /// <typeparam name="TInput">Specifies the type of input messages.</typeparam>
    internal sealed class BoundingStateWithPostponedAndTask<TInput> : BoundingStateWithPostponed<TInput>
    {
        /// <summary>The task used to process messages.</summary>
        internal Task? TaskForInputProcessing;
 
        /// <summary>Initializes the BoundingState.</summary>
        /// <param name="boundedCapacity">The positive bounded capacity.</param>
        internal BoundingStateWithPostponedAndTask(int boundedCapacity) : base(boundedCapacity)
        {
        }
    }
 
    /// <summary>
    /// Type used with TaskCompletionSource(Of TResult) as the TResult
    /// to ensure that the resulting task can't be upcast to something
    /// that in the future could lead to compat problems.
    /// </summary>
    [DebuggerNonUserCode]
    internal readonly struct VoidResult { }
}