|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DefaultIfEmptyQueryOperator.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
namespace System.Linq.Parallel
{
/// <summary>
/// This operator just exposes elements directly from the underlying data source, if
/// it's not empty, or yields a single default element if the data source is empty.
/// There is a minimal amount of synchronization at the beginning, until all partitions
/// have registered whether their stream is empty or not. Once the 0th partition knows
/// that at least one other partition is non-empty, it may proceed. Otherwise, it is
/// the 0th partition which yields the default value.
/// </summary>
/// <typeparam name="TSource"></typeparam>
internal sealed class DefaultIfEmptyQueryOperator<TSource> : UnaryQueryOperator<TSource, TSource>
{
private readonly TSource _defaultValue; // The default value to use (if empty).
//---------------------------------------------------------------------------------------
// Initializes a new reverse operator.
//
// Arguments:
// child - the child whose data we will reverse
//
internal DefaultIfEmptyQueryOperator(IEnumerable<TSource> child, TSource defaultValue)
: base(child)
{
Debug.Assert(child != null, "child data source cannot be null");
_defaultValue = defaultValue;
SetOrdinalIndexState(ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Correct));
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the child and wrapping it with
// partitions as needed.
//
internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
{
// We just open the child operator.
QueryResults<TSource> childQueryResults = Child.Open(settings, preferStriping);
return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
}
internal override void WrapPartitionedStream<TKey>(
PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);
// Generate the shared data.
Shared<int> sharedEmptyCount = new Shared<int>(0);
CountdownEvent sharedLatch = new CountdownEvent(partitionCount - 1);
PartitionedStream<TSource, TKey> outputStream =
new PartitionedStream<TSource, TKey>(partitionCount, inputStream.KeyComparer, OrdinalIndexState);
for (int i = 0; i < partitionCount; i++)
{
outputStream[i] = new DefaultIfEmptyQueryOperatorEnumerator<TKey>(
inputStream[i], _defaultValue, i, partitionCount, sharedEmptyCount, sharedLatch, settings.CancellationState.MergedCancellationToken);
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
{
return Child.AsSequentialQuery(token).DefaultIfEmpty(_defaultValue);
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge that would not be performed in
// a similar sequential operation (i.e., in LINQ to Objects).
//
internal override bool LimitsParallelism
{
get { return false; }
}
//---------------------------------------------------------------------------------------
// The enumerator type responsible for executing the default-if-empty operation.
//
private sealed class DefaultIfEmptyQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TSource, TKey>
{
private readonly QueryOperatorEnumerator<TSource, TKey> _source; // The data source to enumerate.
private bool _lookedForEmpty; // Whether this partition has looked for empty yet.
private readonly int _partitionIndex; // This enumerator's partition index.
private readonly int _partitionCount; // The number of partitions.
private readonly TSource _defaultValue; // The default value if the 0th partition is empty.
// Data shared among partitions.
private readonly Shared<int> _sharedEmptyCount; // The number of empty partitions.
private readonly CountdownEvent _sharedLatch; // Shared latch, signaled when partitions process the 1st item.
private readonly CancellationToken _cancelToken; // Token used to cancel this operator.
//---------------------------------------------------------------------------------------
// Instantiates a new select enumerator.
//
internal DefaultIfEmptyQueryOperatorEnumerator(
QueryOperatorEnumerator<TSource, TKey> source, TSource defaultValue, int partitionIndex, int partitionCount,
Shared<int> sharedEmptyCount, CountdownEvent sharedLatch, CancellationToken cancelToken)
{
Debug.Assert(source != null);
Debug.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
Debug.Assert(partitionCount > 0);
Debug.Assert(sharedEmptyCount != null);
Debug.Assert(sharedLatch != null);
_source = source;
_defaultValue = defaultValue;
_partitionIndex = partitionIndex;
_partitionCount = partitionCount;
_sharedEmptyCount = sharedEmptyCount;
_sharedLatch = sharedLatch;
_cancelToken = cancelToken;
}
//---------------------------------------------------------------------------------------
// Straightforward IEnumerator<T> methods.
//
internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource currentElement, [AllowNull] ref TKey currentKey)
{
Debug.Assert(_source != null);
bool moveNextResult = _source.MoveNext(ref currentElement!, ref currentKey);
// There is special logic the first time this function is called.
if (!_lookedForEmpty)
{
// Ensure we don't enter this loop again.
_lookedForEmpty = true;
if (!moveNextResult)
{
if (ParallelEnumerable.SinglePartitionMode)
{
currentElement = _defaultValue;
currentKey = default(TKey)!;
return true;
}
else if (_partitionIndex == 0)
{
// If this is the 0th partition, we must wait for all others. Note: we could
// actually do a wait-any here: if at least one other partition finds an element,
// there is strictly no need to wait. But this would require extra coordination
// which may or may not be worth the trouble.
_sharedLatch.Wait(_cancelToken);
_sharedLatch.Dispose();
// Now see if there were any other partitions with data.
if (_sharedEmptyCount.Value == _partitionCount - 1)
{
// No data, we will yield the default value.
currentElement = _defaultValue;
currentKey = default(TKey)!;
return true;
}
else
{
// Another partition has data, we are done.
return false;
}
}
else
{
// Not the 0th partition, we will increment the shared empty counter.
Interlocked.Increment(ref _sharedEmptyCount.Value);
}
}
// Every partition (but the 0th) will signal the latch the first time.
if (_partitionIndex != 0)
{
_sharedLatch.Signal();
}
}
return moveNextResult;
}
protected override void Dispose(bool disposing)
{
_source.Dispose();
}
}
}
}
|