File: System\Linq\Parallel\QueryOperators\Binary\ZipQueryOperator.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ZipQueryOperator.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// A Zip operator combines two input data sources into a single output stream,
    /// using a pairwise element matching algorithm. For example, the result of zipping
    /// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs,
    /// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element
    /// is matched with the element in the other data source at the same ordinal
    /// position, the zip operator requires order preservation.
    /// </summary>
    /// <typeparam name="TLeftInput"></typeparam>
    /// <typeparam name="TRightInput"></typeparam>
    /// <typeparam name="TOutput"></typeparam>
    internal sealed class ZipQueryOperator<TLeftInput, TRightInput, TOutput>
        : QueryOperator<TOutput>
    {
        private readonly Func<TLeftInput, TRightInput, TOutput> _resultSelector; // To select result elements.
        private readonly QueryOperator<TLeftInput> _leftChild;
        private readonly QueryOperator<TRightInput> _rightChild;
        private readonly bool _prematureMergeLeft; // Whether to prematurely merge the left data source
        private readonly bool _prematureMergeRight; // Whether to prematurely merge the right data source
        private readonly bool _limitsParallelism; // Whether this operator limits parallelism
 
        //---------------------------------------------------------------------------------------
        // Initializes a new zip operator.
        //
        // Arguments:
        //    leftChild     - the left data source from which to pull data.
        //    rightChild    - the right data source from which to pull data.
        //
 
        internal ZipQueryOperator(
            ParallelQuery<TLeftInput> leftChildSource, ParallelQuery<TRightInput> rightChildSource,
            Func<TLeftInput, TRightInput, TOutput> resultSelector)
            : this(
                QueryOperator<TLeftInput>.AsQueryOperator(leftChildSource),
                QueryOperator<TRightInput>.AsQueryOperator(rightChildSource),
                resultSelector)
        {
        }
 
        private ZipQueryOperator(
            QueryOperator<TLeftInput> left, QueryOperator<TRightInput> right,
            Func<TLeftInput, TRightInput, TOutput> resultSelector)
            : base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings))
        {
            Debug.Assert(resultSelector != null, "operator cannot be null");
 
            _leftChild = left;
            _rightChild = right;
            _resultSelector = resultSelector;
            _outputOrdered = _leftChild.OutputOrdered || _rightChild.OutputOrdered;
 
            OrdinalIndexState leftIndexState = _leftChild.OrdinalIndexState;
            OrdinalIndexState rightIndexState = _rightChild.OrdinalIndexState;
 
            _prematureMergeLeft = leftIndexState != OrdinalIndexState.Indexable;
            _prematureMergeRight = rightIndexState != OrdinalIndexState.Indexable;
            _limitsParallelism =
                (_prematureMergeLeft && leftIndexState != OrdinalIndexState.Shuffled)
                || (_prematureMergeRight && rightIndexState != OrdinalIndexState.Shuffled);
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the children and wrapping them with
        // partitions as needed.
        //
 
        internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
        {
            // We just open our child operators, left and then right.
            QueryResults<TLeftInput> leftChildResults = _leftChild.Open(settings, preferStriping);
            QueryResults<TRightInput> rightChildResults = _rightChild.Open(settings, preferStriping);
 
            Debug.Assert(settings.DegreeOfParallelism != null);
            int partitionCount = settings.DegreeOfParallelism.Value;
            Debug.Assert(settings.TaskScheduler != null);
            if (_prematureMergeLeft)
            {
                PartitionedStreamMerger<TLeftInput> merger = new PartitionedStreamMerger<TLeftInput>(
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, _leftChild.OutputOrdered,
                    settings.CancellationState, settings.QueryId);
                leftChildResults.GivePartitionedStream(merger);
                Debug.Assert(merger.MergeExecutor != null);
                leftChildResults = new ListQueryResults<TLeftInput>(
                    merger.MergeExecutor.GetResultsAsArray()!, partitionCount, preferStriping);
            }
 
            if (_prematureMergeRight)
            {
                PartitionedStreamMerger<TRightInput> merger = new PartitionedStreamMerger<TRightInput>(
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, _rightChild.OutputOrdered,
                    settings.CancellationState, settings.QueryId);
                rightChildResults.GivePartitionedStream(merger);
                Debug.Assert(merger.MergeExecutor != null);
                rightChildResults = new ListQueryResults<TRightInput>(
                    merger.MergeExecutor.GetResultsAsArray()!, partitionCount, preferStriping);
            }
 
            return new ZipQueryOperatorResults(leftChildResults, rightChildResults, _resultSelector, partitionCount, preferStriping);
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
        {
            using (IEnumerator<TLeftInput> leftEnumerator = _leftChild.AsSequentialQuery(token).GetEnumerator())
            using (IEnumerator<TRightInput> rightEnumerator = _rightChild.AsSequentialQuery(token).GetEnumerator())
            {
                while (leftEnumerator.MoveNext() && rightEnumerator.MoveNext())
                {
                    yield return _resultSelector(leftEnumerator.Current, rightEnumerator.Current);
                }
            }
        }
 
        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator.
        //
 
        internal override OrdinalIndexState OrdinalIndexState
        {
            get
            {
                return OrdinalIndexState.Indexable;
            }
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get
            {
                return _limitsParallelism;
            }
        }
 
        //---------------------------------------------------------------------------------------
        // A special QueryResults class for the Zip operator. It requires that both of the child
        // QueryResults are indexable.
        //
 
        internal sealed class ZipQueryOperatorResults : QueryResults<TOutput>
        {
            private readonly QueryResults<TLeftInput> _leftChildResults;
            private readonly QueryResults<TRightInput> _rightChildResults;
            private readonly Func<TLeftInput, TRightInput, TOutput> _resultSelector; // To select result elements.
            private readonly int _count;
            private readonly int _partitionCount;
            private readonly bool _preferStriping;
 
            internal ZipQueryOperatorResults(
                QueryResults<TLeftInput> leftChildResults, QueryResults<TRightInput> rightChildResults,
                Func<TLeftInput, TRightInput, TOutput> resultSelector, int partitionCount, bool preferStriping)
            {
                _leftChildResults = leftChildResults;
                _rightChildResults = rightChildResults;
                _resultSelector = resultSelector;
                _partitionCount = partitionCount;
                _preferStriping = preferStriping;
 
                Debug.Assert(_leftChildResults.IsIndexible);
                Debug.Assert(_rightChildResults.IsIndexible);
 
                _count = Math.Min(_leftChildResults.Count, _rightChildResults.Count);
            }
 
            internal override int ElementsCount
            {
                get { return _count; }
            }
 
            internal override bool IsIndexible
            {
                get { return true; }
            }
 
            internal override TOutput GetElement(int index)
            {
                return _resultSelector(_leftChildResults.GetElement(index), _rightChildResults.GetElement(index));
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient<TOutput> recipient)
            {
                PartitionedStream<TOutput, int> partitionedStream = ExchangeUtilities.PartitionDataSource(this, _partitionCount, _preferStriping);
                recipient.Receive(partitionedStream);
            }
        }
    }
}