File: System\Linq\Parallel\Merging\OrderPreservingMergeHelper.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingMergeHelper.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// The order preserving merge helper guarantees the output stream is in a specific order. This is done
    /// by comparing keys from a set of already-sorted input partitions, and coalescing output data using
    /// incremental key comparisons.
    /// </summary>
    /// <typeparam name="TInputOutput"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    internal sealed class OrderPreservingMergeHelper<TInputOutput, TKey> : IMergeHelper<TInputOutput>
    {
        private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
        private readonly PartitionedStream<TInputOutput, TKey> _partitions; // Source partitions.
        private readonly Shared<TInputOutput[]?> _results; // The array where results are stored.
        private readonly TaskScheduler _taskScheduler; // The task manager to execute the query.
 
        //-----------------------------------------------------------------------------------
        // Instantiates a new merge helper.
        //
        // Arguments:
        //     partitions   - the source partitions from which to consume data.
        //     ignoreOutput - whether we're enumerating "for effect" or for output.
        //
 
        internal OrderPreservingMergeHelper(PartitionedStream<TInputOutput, TKey> partitions, TaskScheduler taskScheduler,
            CancellationState cancellationState, int queryId)
        {
            Debug.Assert(partitions != null);
 
            TraceHelpers.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper");
 
            _taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
            _partitions = partitions;
            _results = new Shared<TInputOutput[]?>(null);
            _taskScheduler = taskScheduler;
        }
 
        //-----------------------------------------------------------------------------------
        // Schedules execution of the merge itself.
        //
        // Arguments:
        //    ordinalIndexState - the state of the ordinal index of the merged partitions
        //
 
        void IMergeHelper<TInputOutput>.Execute()
        {
            OrderPreservingSpoolingTask<TInputOutput, TKey>.Spool(_taskGroupState, _partitions, _results, _taskScheduler);
        }
 
        //-----------------------------------------------------------------------------------
        // Gets the enumerator from which to enumerate output results.
        //
 
        IEnumerator<TInputOutput> IMergeHelper<TInputOutput>.GetEnumerator()
        {
            Debug.Assert(_results.Value != null);
            return ((IEnumerable<TInputOutput>)_results.Value).GetEnumerator();
        }
 
 
        //-----------------------------------------------------------------------------------
        // Returns the results as an array.
        //
 
        public TInputOutput[]? GetResultsAsArray()
        {
            return _results.Value;
        }
    }
}