File: System\Linq\Parallel\QueryOperators\Binary\IntersectQueryOperator.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IntersectQueryOperator.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// Operator that yields the intersection of two data sources.
    /// </summary>
    /// <typeparam name="TInputOutput"></typeparam>
    internal sealed class IntersectQueryOperator<TInputOutput> :
        BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
    {
        private readonly IEqualityComparer<TInputOutput>? _comparer; // An equality comparer.
 
        //---------------------------------------------------------------------------------------
        // Constructs a new intersection operator.
        //
 
        internal IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput>? comparer)
            : base(left, right)
        {
            Debug.Assert(left != null && right != null, "child data sources cannot be null");
 
            _comparer = comparer;
            _outputOrdered = LeftChild.OutputOrdered;
 
            SetOrdinalIndex(OrdinalIndexState.Shuffled);
        }
 
 
        internal override QueryResults<TInputOutput> Open(
            QuerySettings settings, bool preferStriping)
        {
            // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but
            // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
            // partitioning, the output will be hash-partitioned.
            QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
            QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
 
            return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
        }
 
        public override void WrapPartitionedStream<TLeftKey, TRightKey>(
            PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
            IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
        {
            Debug.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount);
 
            if (OutputOrdered)
            {
                WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
                    ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
                        leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
                    rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
            }
            else
            {
                WrapPartitionedStreamHelper<int, TRightKey>(
                    ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
                        leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
                    rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
            }
        }
 
        //---------------------------------------------------------------------------------------
        // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
        // to be, and then call this method with that key as a generic parameter.
        //
 
        private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
            PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
            IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
        {
            int partitionCount = leftHashStream.PartitionCount;
 
            PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
                ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
                    rightPartitionedStream, null, null, _comparer, cancellationToken);
 
            PartitionedStream<TInputOutput, TLeftKey> outputStream =
                new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
            for (int i = 0; i < partitionCount; i++)
            {
                if (OutputOrdered)
                {
                    outputStream[i] = new OrderedIntersectQueryOperatorEnumerator<TLeftKey>(
                        leftHashStream[i], rightHashStream[i], _comparer, leftHashStream.KeyComparer, cancellationToken);
                }
                else
                {
                    outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
                            new IntersectQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], _comparer, cancellationToken);
                }
            }
 
            outputRecipient.Receive(outputStream);
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get { return false; }
        }
 
        //---------------------------------------------------------------------------------------
        // This enumerator performs the intersection operation incrementally. It does this by
        // maintaining a history -- in the form of a set -- of all data already seen. It then
        // only returns elements that are seen twice (returning each one only once).
        //
 
        private sealed class IntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
        {
            private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
            private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
            private readonly IEqualityComparer<TInputOutput>? _comparer; // Comparer to use for equality/hash-coding.
            private HashSet<TInputOutput>? _hashLookup; // The hash lookup, used to produce the intersection.
            private readonly CancellationToken _cancellationToken;
            private Shared<int>? _outputLoopCount;
 
            //---------------------------------------------------------------------------------------
            // Instantiates a new intersection operator.
            //
 
            internal IntersectQueryOperatorEnumerator(
                QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
                QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
                IEqualityComparer<TInputOutput>? comparer, CancellationToken cancellationToken)
            {
                Debug.Assert(leftSource != null);
                Debug.Assert(rightSource != null);
 
                _leftSource = leftSource;
                _rightSource = rightSource;
                _comparer = comparer;
                _cancellationToken = cancellationToken;
            }
 
            //---------------------------------------------------------------------------------------
            // Walks the two data sources, left and then right, to produce the intersection.
            //
 
            internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TInputOutput currentElement, ref int currentKey)
            {
                Debug.Assert(_leftSource != null);
                Debug.Assert(_rightSource != null);
 
                // Build the set out of the right data source, if we haven't already.
 
                if (_hashLookup == null)
                {
                    _outputLoopCount = new Shared<int>(0);
                    _hashLookup = new HashSet<TInputOutput>(_comparer);
 
                    Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
                    int rightKeyUnused = default(int);
 
                    int i = 0;
                    while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
                    {
                        if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                            _cancellationToken.ThrowIfCancellationRequested();
 
                        _hashLookup.Add(rightElement.First);
                    }
                }
 
                // Now iterate over the left data source, looking for matches.
                Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
                TLeftKey keyUnused = default(TLeftKey)!;
 
                while (_leftSource.MoveNext(ref leftElement, ref keyUnused))
                {
                    Debug.Assert(_outputLoopCount != null);
                    if ((_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
                        _cancellationToken.ThrowIfCancellationRequested();
 
                    // If we found the element in our set, and if we haven't returned it yet,
                    // we can yield it to the caller. We also mark it so we know we've returned
                    // it once already and never will again.
                    if (_hashLookup.Remove(leftElement.First))
                    {
                        currentElement = leftElement.First;
#if DEBUG
                        currentKey = unchecked((int)0xdeadbeef);
#endif
                        return true;
                    }
                }
 
                return false;
            }
 
            protected override void Dispose(bool disposing)
            {
                Debug.Assert(_leftSource != null && _rightSource != null);
                _leftSource.Dispose();
                _rightSource.Dispose();
            }
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
        {
            IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
            IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
            return wrappedLeftChild.Intersect(wrappedRightChild, _comparer);
        }
 
 
        private sealed class OrderedIntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
        {
            private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
            private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
            private readonly IEqualityComparer<Wrapper<TInputOutput>> _comparer; // Comparer to use for equality/hash-coding.
            private readonly IComparer<TLeftKey> _leftKeyComparer; // Comparer to use to determine ordering of order keys.
            private Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>? _hashLookup; // The hash lookup, used to produce the intersection.
            private readonly CancellationToken _cancellationToken;
 
            //---------------------------------------------------------------------------------------
            // Instantiates a new intersection operator.
            //
 
            internal OrderedIntersectQueryOperatorEnumerator(
                QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
                QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
                IEqualityComparer<TInputOutput>? comparer, IComparer<TLeftKey> leftKeyComparer,
                CancellationToken cancellationToken)
            {
                Debug.Assert(leftSource != null);
                Debug.Assert(rightSource != null);
 
                _leftSource = leftSource;
                _rightSource = rightSource;
                _comparer = new WrapperEqualityComparer<TInputOutput>(comparer);
                _leftKeyComparer = leftKeyComparer;
                _cancellationToken = cancellationToken;
            }
 
            //---------------------------------------------------------------------------------------
            // Walks the two data sources, left and then right, to produce the intersection.
            //
 
            internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TInputOutput currentElement, [AllowNull] ref TLeftKey currentKey)
            {
                Debug.Assert(_leftSource != null);
                Debug.Assert(_rightSource != null);
 
                // Build the set out of the left data source, if we haven't already.
                int i = 0;
                if (_hashLookup == null)
                {
                    _hashLookup = new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(_comparer);
 
                    Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
                    TLeftKey leftKey = default(TLeftKey)!;
                    while (_leftSource.MoveNext(ref leftElement, ref leftKey))
                    {
                        if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                            _cancellationToken.ThrowIfCancellationRequested();
 
                        // For each element, we track the smallest order key for that element that we saw so far
                        Pair<TInputOutput, TLeftKey> oldEntry;
                        Wrapper<TInputOutput> wrappedLeftElem = new Wrapper<TInputOutput>(leftElement.First);
 
                        // If this is the first occurrence of this element, or the order key is lower than all keys we saw previously,
                        // update the order key for this element.
                        if (!_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || _leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
                        {
                            // For each "elem" value, we store the smallest key, and the element value that had that key.
                            // Note that even though two element values are "equal" according to the EqualityComparer,
                            // we still cannot choose arbitrarily which of the two to yield.
                            _hashLookup[wrappedLeftElem] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
                        }
                    }
                }
 
                // Now iterate over the right data source, looking for matches.
                Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
                int rightKeyUnused = default(int);
                while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
                {
                    if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                        _cancellationToken.ThrowIfCancellationRequested();
 
                    // If we found the element in our set, and if we haven't returned it yet,
                    // we can yield it to the caller. We also mark it so we know we've returned
                    // it once already and never will again.
 
                    Pair<TInputOutput, TLeftKey> entry;
                    Wrapper<TInputOutput> wrappedRightElem = new Wrapper<TInputOutput>(rightElement.First);
 
                    if (_hashLookup.TryGetValue(wrappedRightElem, out entry))
                    {
                        currentElement = entry.First;
                        currentKey = entry.Second;
 
                        _hashLookup.Remove(new Wrapper<TInputOutput>(entry.First));
                        return true;
                    }
                }
 
                return false;
            }
 
            protected override void Dispose(bool disposing)
            {
                Debug.Assert(_leftSource != null && _rightSource != null);
                _leftSource.Dispose();
                _rightSource.Dispose();
            }
        }
    }
}