File: Internal\TargetRegistry.cs
Web Access
Project: src\src\libraries\System.Threading.Tasks.Dataflow\src\System.Threading.Tasks.Dataflow.csproj (System.Threading.Tasks.Dataflow)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// TargetRegistry.cs
//
//
// A store of registered targets with a target block.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
 
namespace System.Threading.Tasks.Dataflow.Internal
{
    /// <summary>Stores targets registered with a source.</summary>
    /// <typeparam name="T">Specifies the type of data accepted by the targets.</typeparam>
    /// <remarks>This type is not thread-safe.</remarks>
    [DebuggerDisplay("Count = {Count}")]
    [DebuggerTypeProxy(typeof(TargetRegistry<>.DebugView))]
    internal sealed class TargetRegistry<T>
    {
        /// <summary>
        /// Information about a registered target. This class represents a self-sufficient node in a linked list.
        /// </summary>
        internal sealed class LinkedTargetInfo
        {
            /// <summary>Initializes the LinkedTargetInfo.</summary>
            /// <param name="target">The target block reference for this entry.</param>
            /// <param name="linkOptions">The link options.</param>
            internal LinkedTargetInfo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
            {
                Debug.Assert(target != null, "The target that is supposed to be linked must not be null.");
                Debug.Assert(linkOptions != null, "The linkOptions must not be null.");
 
                Target = target;
                PropagateCompletion = linkOptions.PropagateCompletion;
                RemainingMessages = linkOptions.MaxMessages;
            }
 
            /// <summary>The target block reference for this entry.</summary>
            internal readonly ITargetBlock<T> Target;
            /// <summary>The value of the PropagateCompletion link option.</summary>
            internal readonly bool PropagateCompletion;
            /// <summary>Number of remaining messages to propagate.
            /// This counter is initialized to the MaxMessages option and
            /// gets decremented after each successful propagation.</summary>
            internal int RemainingMessages;
            /// <summary>The previous node in the list.</summary>
            internal LinkedTargetInfo? Previous;
            /// <summary>The next node in the list.</summary>
            internal LinkedTargetInfo? Next;
        }
 
        /// <summary>A reference to the owning source block.</summary>
        private readonly ISourceBlock<T> _owningSource;
        /// <summary>A mapping of targets to information about them.</summary>
        private readonly Dictionary<ITargetBlock<T>, LinkedTargetInfo> _targetInformation;
        /// <summary>The first node of an ordered list of targets. Messages should be offered to targets starting from First and following Next.</summary>
        private LinkedTargetInfo? _firstTarget;
        /// <summary>The last node of the ordered list of targets. This field is used purely as a perf optimization to avoid traversing the list for each Add.</summary>
        private LinkedTargetInfo? _lastTarget;
        /// <summary>Number of links with positive RemainingMessages counters.
        /// This is an optimization that allows us to skip dictionary lookup when this counter is 0.</summary>
        private int _linksWithRemainingMessages;
 
        /// <summary>Initializes the registry.</summary>
        internal TargetRegistry(ISourceBlock<T> owningSource)
        {
            Debug.Assert(owningSource != null, "The TargetRegistry instance must be owned by a source block.");
 
            _owningSource = owningSource;
            _targetInformation = new Dictionary<ITargetBlock<T>, LinkedTargetInfo>();
        }
 
        /// <summary>Adds a target to the registry.</summary>
        /// <param name="target">The target to add.</param>
        /// <param name="linkOptions">The link options.</param>
        internal void Add(ref ITargetBlock<T> target, DataflowLinkOptions linkOptions)
        {
            Debug.Assert(target != null, "The target that is supposed to be linked must not be null.");
            Debug.Assert(linkOptions != null, "The link options must not be null.");
 
            // If the target already exists in the registry, replace it with a new NopLinkPropagator to maintain uniqueness
            if (_targetInformation.TryGetValue(target, out _)) target = new NopLinkPropagator(_owningSource, target);
 
            // Add the target to both stores, the list and the dictionary, which are used for different purposes
            var node = new LinkedTargetInfo(target, linkOptions);
            AddToList(node, linkOptions.Append);
            _targetInformation.Add(target, node);
 
            // Increment the optimization counter if needed
            Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
            if (node.RemainingMessages > 0) _linksWithRemainingMessages++;
            DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
            if (etwLog.IsEnabled())
            {
                etwLog.DataflowBlockLinking(_owningSource, target);
            }
        }
 
        /// <summary>Gets whether the registry contains a particular target.</summary>
        /// <param name="target">The target.</param>
        /// <returns>true if the registry contains the target; otherwise, false.</returns>
        internal bool Contains(ITargetBlock<T> target)
        {
            return _targetInformation.ContainsKey(target);
        }
 
        /// <summary>Removes the target from the registry.</summary>
        /// <param name="target">The target to remove.</param>
        /// <param name="onlyIfReachedMaxMessages">
        /// Only remove the target if it's configured to be unlinked after one propagation.
        /// </param>
        internal void Remove(ITargetBlock<T> target, bool onlyIfReachedMaxMessages = false)
        {
            Debug.Assert(target != null, "Target to remove is required.");
 
            // If we are implicitly unlinking and there is nothing to be unlinked implicitly, bail
            Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
            if (onlyIfReachedMaxMessages && _linksWithRemainingMessages == 0) return;
 
            // Otherwise take the slow path
            Remove_Slow(target, onlyIfReachedMaxMessages);
        }
 
        /// <summary>Actually removes the target from the registry.</summary>
        /// <param name="target">The target to remove.</param>
        /// <param name="onlyIfReachedMaxMessages">
        /// Only remove the target if it's configured to be unlinked after one propagation.
        /// </param>
        private void Remove_Slow(ITargetBlock<T> target, bool onlyIfReachedMaxMessages)
        {
            Debug.Assert(target != null, "Target to remove is required.");
 
            // Make sure we've intended to go the slow route
            Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
            Debug.Assert(!onlyIfReachedMaxMessages || _linksWithRemainingMessages > 0, "We shouldn't have ended on the slow path.");
 
            // If the target is registered...
            LinkedTargetInfo? node;
            if (_targetInformation.TryGetValue(target, out node))
            {
                Debug.Assert(node != null, "The LinkedTargetInfo node referenced in the Dictionary must be non-null.");
 
                // Remove the target, if either there's no constraint on the removal
                // or if this was the last remaining message.
                if (!onlyIfReachedMaxMessages || node.RemainingMessages == 1)
                {
                    RemoveFromList(node);
                    _targetInformation.Remove(target);
 
                    // Decrement the optimization counter if needed
                    if (node.RemainingMessages == 0) _linksWithRemainingMessages--;
                    Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
                    DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
                    if (etwLog.IsEnabled())
                    {
                        etwLog.DataflowBlockUnlinking(_owningSource, target);
                    }
                }
                // If the target is to stay and we are counting the remaining messages for this link, decrement the counter
                else if (node.RemainingMessages > 0)
                {
                    Debug.Assert(node.RemainingMessages > 1, "The target should have been removed, because there are no remaining messages.");
                    node.RemainingMessages--;
                }
            }
        }
 
        /// <summary>Clears the target registry entry points while allowing subsequent traversals of the linked list.</summary>
        internal LinkedTargetInfo? ClearEntryPoints()
        {
            // Save _firstTarget so we can return it
            LinkedTargetInfo? firstTarget = _firstTarget;
 
            // Clear out the entry points
            _firstTarget = _lastTarget = null;
            _targetInformation.Clear();
            Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
            _linksWithRemainingMessages = 0;
 
            return firstTarget;
        }
 
        /// <summary>Propagated completion to the targets of the given linked list.</summary>
        /// <param name="firstTarget">The head of a saved linked list.</param>
        internal void PropagateCompletion(LinkedTargetInfo? firstTarget)
        {
            Debug.Assert(_owningSource.Completion.IsCompleted, "The owning source must have completed before propagating completion.");
 
            // Cache the owning source's completion task to avoid calling the getter many times
            Task owningSourceCompletion = _owningSource.Completion;
 
            // Propagate completion to those targets that have requested it
            for (LinkedTargetInfo? node = firstTarget; node != null; node = node.Next)
            {
                if (node.PropagateCompletion) Common.PropagateCompletion(owningSourceCompletion, node.Target, Common.AsyncExceptionHandler);
            }
        }
 
        /// <summary>Gets the first node of the ordered target list.</summary>
        internal LinkedTargetInfo? FirstTargetNode { get { return _firstTarget; } }
 
        /// <summary>Adds a LinkedTargetInfo node to the doubly-linked list.</summary>
        /// <param name="node">The node to be added.</param>
        /// <param name="append">Whether to append or to prepend the node.</param>
        internal void AddToList(LinkedTargetInfo node, bool append)
        {
            Debug.Assert(node != null, "Requires a node to be added.");
 
            // If the list is empty, assign the ends to point to the new node and we are done
            if (_firstTarget == null && _lastTarget == null)
            {
                _firstTarget = _lastTarget = node;
            }
            else
            {
                Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must either be null or non-null.");
                Debug.Assert(_lastTarget.Next == null, "The last node must not have a successor.");
                Debug.Assert(_firstTarget.Previous == null, "The first node must not have a predecessor.");
 
                if (append)
                {
                    // Link the new node to the end of the existing list
                    node.Previous = _lastTarget;
                    _lastTarget.Next = node;
                    _lastTarget = node;
                }
                else
                {
                    // Link the new node to the front of the existing list
                    node.Next = _firstTarget;
                    _firstTarget.Previous = node;
                    _firstTarget = node;
                }
            }
 
            Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null after AddToList.");
        }
 
        /// <summary>Removes the LinkedTargetInfo node from the doubly-linked list.</summary>
        /// <param name="node">The node to be removed.</param>
        internal void RemoveFromList(LinkedTargetInfo node)
        {
            Debug.Assert(node != null, "Node to remove is required.");
            Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null before RemoveFromList.");
 
            LinkedTargetInfo? previous = node.Previous;
            LinkedTargetInfo? next = node.Next;
 
            // Remove the node by linking the adjacent nodes
            if (node.Previous != null)
            {
                node.Previous.Next = next;
                node.Previous = null;
            }
 
            if (node.Next != null)
            {
                node.Next.Previous = previous;
                node.Next = null;
            }
 
            // Adjust the list ends
            if (_firstTarget == node) _firstTarget = next;
            if (_lastTarget == node) _lastTarget = previous;
 
            Debug.Assert((_firstTarget != null) == (_lastTarget != null), "Both first and last node must either be null or non-null after RemoveFromList.");
        }
 
        /// <summary>Gets the number of items in the registry.</summary>
        private int Count { get { return _targetInformation.Count; } }
 
        /// <summary>Converts the linked list of targets to an array for rendering in a debugger.</summary>
        private ITargetBlock<T>[] TargetsForDebugger
        {
            get
            {
                var targets = new ITargetBlock<T>[Count];
                int i = 0;
                for (LinkedTargetInfo? node = _firstTarget; node != null; node = node.Next)
                {
                    targets[i++] = node.Target;
                }
 
                return targets;
            }
        }
 
 
 
        /// <summary>Provides a nop passthrough for use with TargetRegistry.</summary>
        [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
        [DebuggerTypeProxy(typeof(TargetRegistry<>.NopLinkPropagator.DebugView))]
        private sealed class NopLinkPropagator : IPropagatorBlock<T, T>, ISourceBlock<T>, IDebuggerDisplay
        {
            /// <summary>The source that encapsulates this block.</summary>
            private readonly ISourceBlock<T> _owningSource;
            /// <summary>The target with which this block is associated.</summary>
            private readonly ITargetBlock<T> _target;
 
            /// <summary>Initializes the passthrough.</summary>
            /// <param name="owningSource">The source that encapsulates this block.</param>
            /// <param name="target">The target to which messages should be forwarded.</param>
            internal NopLinkPropagator(ISourceBlock<T> owningSource, ITargetBlock<T> target)
            {
                Debug.Assert(owningSource != null, "Propagator must be associated with a source.");
                Debug.Assert(target != null, "Target to propagate to is required.");
 
                // Store the arguments
                _owningSource = owningSource;
                _target = target;
            }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
            DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
            {
                Debug.Assert(source == _owningSource, "Only valid to be used with the source for which it was created.");
                return _target.OfferMessage(messageHeader, messageValue, this, consumeToAccept);
            }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
            T? ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
            {
                return _owningSource.ConsumeMessage(messageHeader, this, out messageConsumed);
            }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
            bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
            {
                return _owningSource.ReserveMessage(messageHeader, this);
            }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
            void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
            {
                _owningSource.ReleaseReservation(messageHeader, this);
            }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
            Task IDataflowBlock.Completion { get { return _owningSource.Completion; } }
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
            void IDataflowBlock.Complete() { _target.Complete(); }
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
            void IDataflowBlock.Fault(Exception exception) { _target.Fault(exception); }
 
            /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
            IDisposable ISourceBlock<T>.LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
 
            /// <summary>The data to display in the debugger display attribute.</summary>
            private object DebuggerDisplayContent
            {
                get
                {
                    var displaySource = _owningSource as IDebuggerDisplay;
                    var displayTarget = _target as IDebuggerDisplay;
                    return $"{Common.GetNameForDebugger(this)} Source = \"{(displaySource != null ? displaySource.Content : _owningSource)}\", Target = \"{(displayTarget != null ? displayTarget.Content : _target)}\"";
                }
            }
            /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
            object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
 
            /// <summary>Provides a debugger type proxy for a passthrough.</summary>
            private sealed class DebugView
            {
                /// <summary>The passthrough.</summary>
                private readonly NopLinkPropagator _passthrough;
 
                /// <summary>Initializes the debug view.</summary>
                /// <param name="passthrough">The passthrough to view.</param>
                public DebugView(NopLinkPropagator passthrough)
                {
                    Debug.Assert(passthrough != null, "Need a propagator with which to construct the debug view.");
                    _passthrough = passthrough;
                }
 
                /// <summary>The linked target for this block.</summary>
                public ITargetBlock<T> LinkedTarget { get { return _passthrough._target; } }
            }
        }
 
 
        /// <summary>Provides a debugger type proxy for the target registry.</summary>
        private sealed class DebugView
        {
            /// <summary>The registry being debugged.</summary>
            private readonly TargetRegistry<T> _registry;
 
            /// <summary>Initializes the type proxy.</summary>
            /// <param name="registry">The target registry.</param>
            public DebugView(TargetRegistry<T> registry)
            {
                Debug.Assert(registry != null, "Need a registry with which to construct the debug view.");
                _registry = registry;
            }
 
            /// <summary>Gets a list of all targets to show in the debugger.</summary>
            [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
            public ITargetBlock<T>[] Targets { get { return _registry.TargetsForDebugger; } }
        }
    }
}