File: System\Linq\Parallel\QueryOperators\Unary\LastQueryOperator.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// LastQueryOperator.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// Last tries to discover the last element in the source, optionally matching a
    /// predicate.  All partitions search in parallel, publish the greatest index for a
    /// candidate match, and reach a barrier.  Only the partition that "wins" the race,
    /// i.e. who found the candidate with the largest index, will yield an element.
    ///
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    internal sealed class LastQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
    {
        private readonly Func<TSource, bool>? _predicate; // The optional predicate used during the search.
        private readonly bool _prematureMergeNeeded; // Whether to prematurely merge the input of this operator.
 
        //---------------------------------------------------------------------------------------
        // Initializes a new last operator.
        //
        // Arguments:
        //     child                - the child whose data we will reverse
        //
 
        internal LastQueryOperator(IEnumerable<TSource> child, Func<TSource, bool>? predicate)
            : base(child)
        {
            Debug.Assert(child != null, "child data source cannot be null");
            _predicate = predicate;
            _prematureMergeNeeded = Child.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        //
 
        internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
        {
            QueryResults<TSource> childQueryResults = Child.Open(settings, false);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void WrapPartitionedStream<TKey>(
            PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
        {
            // If the index is not at least increasing, we need to reindex.
            if (_prematureMergeNeeded)
            {
                PartitionedStream<TSource, int> intKeyStream =
                    ExecuteAndCollectResults(inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings).GetPartitionedStream();
                WrapHelper<int>(intKeyStream, recipient, settings);
            }
            else
            {
                WrapHelper<TKey>(inputStream, recipient, settings);
            }
        }
 
        private void WrapHelper<TKey>(PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
        {
            int partitionCount = inputStream.PartitionCount;
            if (ParallelEnumerable.SinglePartitionMode)
                Debug.Assert(partitionCount == 1);
 
            // Generate the shared data.
            LastQueryOperatorState<TKey> operatorState = new LastQueryOperatorState<TKey>();
            CountdownEvent sharedBarrier = new CountdownEvent(partitionCount);
 
            PartitionedStream<TSource, int> outputStream =
                new PartitionedStream<TSource, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Shuffled);
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = new LastQueryOperatorEnumerator<TKey>(
                    inputStream[i], _predicate, operatorState, sharedBarrier, settings.CancellationState.MergedCancellationToken,
                    inputStream.KeyComparer, i);
            }
            recipient.Receive(outputStream);
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
        [ExcludeFromCodeCoverage(Justification = "This method should never be called as fallback to sequential is handled in ParallelEnumerable.First()")]
        internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
        {
            Debug.Fail("This method should never be called as fallback to sequential is handled in ParallelEnumerable.First().");
            throw new NotSupportedException();
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get { return false; }
        }
 
        //---------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the last operation.
        //
 
        private sealed class LastQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, int>
        {
            private readonly QueryOperatorEnumerator<TSource, TKey> _source; // The data source to enumerate.
            private readonly Func<TSource, bool>? _predicate; // The optional predicate used during the search.
            private bool _alreadySearched; // Set once the enumerator has performed the search.
            private readonly int _partitionId; // ID of this partition
 
            // Data shared among partitions.
            private readonly LastQueryOperatorState<TKey> _operatorState; // The current last candidate and its partition id.
            private readonly CountdownEvent _sharedBarrier; // Shared barrier, signaled when partitions find their 1st element.
            private readonly CancellationToken _cancellationToken; // Token used to cancel this operator.
            private readonly IComparer<TKey> _keyComparer; // Comparer for the order keys
 
            //---------------------------------------------------------------------------------------
            // Instantiates a new enumerator.
            //
 
            internal LastQueryOperatorEnumerator(
                QueryOperatorEnumerator<TSource, TKey> source, Func<TSource, bool>? predicate,
                LastQueryOperatorState<TKey> operatorState, CountdownEvent sharedBarrier, CancellationToken cancelToken,
                IComparer<TKey> keyComparer, int partitionId)
            {
                Debug.Assert(source != null);
                Debug.Assert(operatorState != null);
                Debug.Assert(sharedBarrier != null);
                Debug.Assert(keyComparer != null);
 
                _source = source;
                _predicate = predicate;
                _operatorState = operatorState;
                _sharedBarrier = sharedBarrier;
                _cancellationToken = cancelToken;
                _keyComparer = keyComparer;
                _partitionId = partitionId;
            }
 
            //---------------------------------------------------------------------------------------
            // Straightforward IEnumerator<T> methods.
            //
 
            internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource currentElement, ref int currentKey)
            {
                Debug.Assert(_source != null);
 
                if (_alreadySearched)
                {
                    return false;
                }
 
                // Look for the greatest element.
                TSource candidate = default(TSource)!;
                TKey candidateKey = default(TKey)!;
                bool candidateFound = false;
                try
                {
                    int loopCount = 0; //counter to help with cancellation
                    TSource value = default(TSource)!;
                    TKey key = default(TKey)!;
                    while (_source.MoveNext(ref value!, ref key))
                    {
                        if ((loopCount & CancellationState.POLL_INTERVAL) == 0)
                            _cancellationToken.ThrowIfCancellationRequested();
 
                        // If the predicate is null or the current element satisfies it, we will remember
                        // it as the current partition's candidate for the last element, and move on.
                        if (_predicate == null || _predicate(value))
                        {
                            candidate = value;
                            candidateKey = key;
                            candidateFound = true;
                        }
 
                        loopCount++;
                    }
 
                    // If we found a candidate element, try to publish it, so long as it's greater.
                    if (candidateFound)
                    {
                        lock (_operatorState)
                        {
                            if (_operatorState._partitionId == -1 || _keyComparer.Compare(candidateKey, _operatorState._key) > 0)
                            {
                                _operatorState._partitionId = _partitionId;
                                _operatorState._key = candidateKey;
                            }
                        }
                    }
                }
                finally
                {
                    // No matter whether we exit due to an exception or normal completion, we must ensure
                    // that we signal other partitions that we have completed.  Otherwise, we can cause deadlocks.
                    _sharedBarrier.Signal();
                }
 
                _alreadySearched = true;
 
                // Only if we have a candidate do we wait.
                if (_partitionId == _operatorState._partitionId)
                {
                    if (!ParallelEnumerable.SinglePartitionMode)
                        _sharedBarrier.Wait(_cancellationToken);
 
                    // Now re-read the shared index. If it's the same as ours, we won and return true.
                    if (_operatorState._partitionId == _partitionId)
                    {
                        currentElement = candidate;
                        currentKey = 0; // 1st (and only) element, so we hardcode the output index to 0.
                        return true;
                    }
                }
 
                // If we got here, we didn't win. Return false.
                return false;
            }
 
            protected override void Dispose(bool disposing)
            {
                _source.Dispose();
            }
        }
 
 
        private sealed class LastQueryOperatorState<TKey>
        {
            internal TKey _key = default!;
            internal int _partitionId = -1;
        }
    }
}