File: System\Linq\Parallel\QueryOperators\Unary\DefaultIfEmptyQueryOperator.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DefaultIfEmptyQueryOperator.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// This operator just exposes elements directly from the underlying data source, if
    /// it's not empty, or yields a single default element if the data source is empty.
    /// There is a minimal amount of synchronization at the beginning, until all partitions
    /// have registered whether their stream is empty or not. Once the 0th partition knows
    /// that at least one other partition is non-empty, it may proceed. Otherwise, it is
    /// the 0th partition which yields the default value.
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    internal sealed class DefaultIfEmptyQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
    {
        private readonly TSource _defaultValue; // The default value to use (if empty).
 
        //---------------------------------------------------------------------------------------
        // Initializes a new reverse operator.
        //
        // Arguments:
        //     child                - the child whose data we will reverse
        //
 
        internal DefaultIfEmptyQueryOperator(IEnumerable<TSource> child, TSource defaultValue)
            : base(child)
        {
            Debug.Assert(child != null, "child data source cannot be null");
            _defaultValue = defaultValue;
            SetOrdinalIndexState(ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Correct));
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        //
 
        internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
        {
            // We just open the child operator.
            QueryResults<TSource> childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void WrapPartitionedStream<TKey>(
            PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
        {
            int partitionCount = inputStream.PartitionCount;
            if (ParallelEnumerable.SinglePartitionMode)
                Debug.Assert(partitionCount == 1);
 
            // Generate the shared data.
            Shared<int> sharedEmptyCount = new Shared<int>(0);
            CountdownEvent sharedLatch = new CountdownEvent(partitionCount - 1);
 
            PartitionedStream<TSource, TKey> outputStream =
                new PartitionedStream<TSource, TKey>(partitionCount, inputStream.KeyComparer, OrdinalIndexState);
 
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = new DefaultIfEmptyQueryOperatorEnumerator<TKey>(
                    inputStream[i], _defaultValue, i, partitionCount, sharedEmptyCount, sharedLatch, settings.CancellationState.MergedCancellationToken);
            }
 
            recipient.Receive(outputStream);
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
        {
            return Child.AsSequentialQuery(token).DefaultIfEmpty(_defaultValue);
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get { return false; }
        }
 
        //---------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the default-if-empty operation.
        //
 
        private sealed class DefaultIfEmptyQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, TKey>
        {
            private readonly QueryOperatorEnumerator<TSource, TKey> _source; // The data source to enumerate.
            private bool _lookedForEmpty; // Whether this partition has looked for empty yet.
            private readonly int _partitionIndex; // This enumerator's partition index.
            private readonly int _partitionCount; // The number of partitions.
            private readonly TSource _defaultValue; // The default value if the 0th partition is empty.
 
            // Data shared among partitions.
            private readonly Shared<int> _sharedEmptyCount; // The number of empty partitions.
 
            private readonly CountdownEvent _sharedLatch; // Shared latch, signaled when partitions process the 1st item.
            private readonly CancellationToken _cancelToken; // Token used to cancel this operator.
 
            //---------------------------------------------------------------------------------------
            // Instantiates a new select enumerator.
            //
 
            internal DefaultIfEmptyQueryOperatorEnumerator(
                QueryOperatorEnumerator<TSource, TKey> source, TSource defaultValue, int partitionIndex, int partitionCount,
                Shared<int> sharedEmptyCount, CountdownEvent sharedLatch, CancellationToken cancelToken)
            {
                Debug.Assert(source != null);
                Debug.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
                Debug.Assert(partitionCount > 0);
                Debug.Assert(sharedEmptyCount != null);
                Debug.Assert(sharedLatch != null);
 
                _source = source;
                _defaultValue = defaultValue;
                _partitionIndex = partitionIndex;
                _partitionCount = partitionCount;
                _sharedEmptyCount = sharedEmptyCount;
                _sharedLatch = sharedLatch;
                _cancelToken = cancelToken;
            }
 
            //---------------------------------------------------------------------------------------
            // Straightforward IEnumerator<T> methods.
            //
 
            internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource currentElement, [AllowNull] ref TKey currentKey)
            {
                Debug.Assert(_source != null);
 
                bool moveNextResult = _source.MoveNext(ref currentElement!, ref currentKey);
 
                // There is special logic the first time this function is called.
                if (!_lookedForEmpty)
                {
                    // Ensure we don't enter this loop again.
                    _lookedForEmpty = true;
 
                    if (!moveNextResult)
                    {
                        if (ParallelEnumerable.SinglePartitionMode)
                        {
                            currentElement = _defaultValue;
                            currentKey = default(TKey)!;
                            return true;
                        }
                        else if (_partitionIndex == 0)
                        {
                            // If this is the 0th partition, we must wait for all others.  Note: we could
                            // actually do a wait-any here: if at least one other partition finds an element,
                            // there is strictly no need to wait.  But this would require extra coordination
                            // which may or may not be worth the trouble.
                            _sharedLatch.Wait(_cancelToken);
                            _sharedLatch.Dispose();
 
                            // Now see if there were any other partitions with data.
                            if (_sharedEmptyCount.Value == _partitionCount - 1)
                            {
                                // No data, we will yield the default value.
                                currentElement = _defaultValue;
                                currentKey = default(TKey)!;
                                return true;
                            }
                            else
                            {
                                // Another partition has data, we are done.
                                return false;
                            }
                        }
                        else
                        {
                            // Not the 0th partition, we will increment the shared empty counter.
                            Interlocked.Increment(ref _sharedEmptyCount.Value);
                        }
                    }
 
                    // Every partition (but the 0th) will signal the latch the first time.
                    if (_partitionIndex != 0)
                    {
                        _sharedLatch.Signal();
                    }
                }
 
                return moveNextResult;
            }
 
            protected override void Dispose(bool disposing)
            {
                _source.Dispose();
            }
        }
    }
}