File: System\Linq\Parallel\Scheduling\OrderPreservingPipeliningSpoolingTask.cs
Web Access
Project: src\src\libraries\System.Linq.Parallel\src\System.Linq.Parallel.csproj (System.Linq.Parallel)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingPipeliningSpoolingTask.cs
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Parallel;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Linq.Parallel
{
#if !FEATURE_WASM_MANAGED_THREADS
    [System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
#endif
    internal sealed class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
    {
        private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
        private readonly QueryOperatorEnumerator<TOutput, TKey> _partition; // The source partition.
        private readonly bool[] _consumerWaiting; // Whether a consumer is waiting on a particular producer
        private readonly bool[] _producerWaiting; // Whether a particular producer is waiting on the consumer
        private readonly bool[] _producerDone; // Whether each producer is done
        private readonly int _partitionIndex; // Index of the partition owned by this task.
 
        private readonly Queue<Pair<TKey, TOutput>>[] _buffers; // The buffer for the results
        private readonly object _bufferLock; // A lock for the buffer
 
        /// <summary>
        /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
        /// If false, the producer will make each result available to the consumer immediately after it is
        /// produced.
        /// </summary>
        private readonly bool _autoBuffered;
 
        /// <summary>
        /// The number of elements to accumulate on the producer before copying the elements to the
        /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
        ///
        /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
        /// cost.
        /// </summary>
        private const int PRODUCER_BUFFER_AUTO_SIZE = 16;
 
        /// <summary>
        /// Constructor
        /// </summary>
        internal OrderPreservingPipeliningSpoolingTask(
            QueryOperatorEnumerator<TOutput, TKey> partition,
            QueryTaskGroupState taskGroupState,
            bool[] consumerWaiting,
            bool[] producerWaiting,
            bool[] producerDone,
            int partitionIndex,
            Queue<Pair<TKey, TOutput>>[] buffers,
            object bufferLock,
            bool autoBuffered)
            : base(partitionIndex, taskGroupState)
        {
            Debug.Assert(partition != null);
            Debug.Assert(taskGroupState != null);
            Debug.Assert(consumerWaiting != null);
            Debug.Assert(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length);
            Debug.Assert(producerDone != null && producerDone.Length == consumerWaiting.Length);
            Debug.Assert(buffers != null && buffers.Length == consumerWaiting.Length);
            Debug.Assert(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
 
            _partition = partition;
            _taskGroupState = taskGroupState;
            _producerDone = producerDone;
            _consumerWaiting = consumerWaiting;
            _producerWaiting = producerWaiting;
            _partitionIndex = partitionIndex;
            _buffers = buffers;
            _bufferLock = bufferLock;
            _autoBuffered = autoBuffered;
        }
 
        /// <summary>
        /// This method is responsible for enumerating results and enqueuing them to
        /// the output buffer as appropriate.  Each base class implements its own.
        /// </summary>
        protected override void SpoolingWork()
        {
            TOutput element = default(TOutput)!;
            TKey key = default(TKey)!;
 
            int chunkSize = _autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
            Pair<TKey, TOutput>[] chunk = new Pair<TKey, TOutput>[chunkSize];
            var partition = _partition;
            CancellationToken cancelToken = _taskGroupState.CancellationState.MergedCancellationToken;
 
            int lastChunkSize;
            do
            {
                lastChunkSize = 0;
                while (lastChunkSize < chunkSize && partition.MoveNext(ref element!, ref key))
                {
                    chunk[lastChunkSize] = new Pair<TKey, TOutput>(key, element);
                    lastChunkSize++;
                }
 
                if (lastChunkSize == 0) break;
 
                lock (_bufferLock)
                {
                    // Check if the query has been cancelled.
                    if (cancelToken.IsCancellationRequested)
                    {
                        break;
                    }
 
                    for (int i = 0; i < lastChunkSize; i++)
                    {
                        _buffers[_partitionIndex].Enqueue(chunk[i]);
                    }
 
                    if (_consumerWaiting[_partitionIndex])
                    {
                        Monitor.Pulse(_bufferLock);
                        _consumerWaiting[_partitionIndex] = false;
                    }
 
                    // If the producer buffer is too large, wait.
                    // Note: we already checked for cancellation after acquiring the lock on this producer.
                    // That guarantees that the consumer will eventually wake up the producer.
                    if (_buffers[_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper<TOutput, TKey>.MAX_BUFFER_SIZE)
                    {
                        _producerWaiting[_partitionIndex] = true;
                        Monitor.Wait(_bufferLock);
                    }
                }
            } while (lastChunkSize == chunkSize);
        }
 
 
        /// <summary>
        /// Creates and begins execution of a new set of spooling tasks.
        /// </summary>
        public static void Spool(
            QueryTaskGroupState groupState, PartitionedStream<TOutput, TKey> partitions,
            bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone,
            Queue<Pair<TKey, TOutput>>[] buffers, object[] bufferLocks,
            TaskScheduler taskScheduler, bool autoBuffered)
        {
            Debug.Assert(groupState != null);
            Debug.Assert(partitions != null);
            Debug.Assert(producerDone != null && producerDone.Length == partitions.PartitionCount);
            Debug.Assert(buffers != null && buffers.Length == partitions.PartitionCount);
            Debug.Assert(bufferLocks != null);
 
            int degreeOfParallelism = partitions.PartitionCount;
 
            // Initialize the buffers and buffer locks.
            for (int i = 0; i < degreeOfParallelism; i++)
            {
                buffers[i] = new Queue<Pair<TKey, TOutput>>(OrderPreservingPipeliningMergeHelper<TOutput, TKey>.INITIAL_BUFFER_SIZE);
                bufferLocks[i] = new object();
            }
 
            // Ensure all tasks in this query are parented under a common root. Because this
            // is a pipelined query, we detach it from the parent (to avoid blocking the calling
            // thread), and run the query on a separate thread.
            Task rootTask = new Task(
                () =>
                {
                    for (int i = 0; i < degreeOfParallelism; i++)
                    {
                        QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask<TOutput, TKey>(
                            partitions[i], groupState, consumerWaiting, producerWaiting,
                            producerDone, i, buffers, bufferLocks[i], autoBuffered);
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask);
 
            // And schedule it for execution.  This is done after beginning to ensure no thread tries to
            // end the query before its root task has been recorded properly.
            rootTask.Start(taskScheduler);
            // We don't call QueryEnd here; when we return, the query is still executing, and the
            // last enumerator to be disposed of will call QueryEnd for us.
        }
 
        /// <summary>
        /// Dispose the underlying enumerator and wake up the consumer if necessary.
        /// </summary>
        protected override void SpoolingFinally()
        {
            // Let the consumer know that this producer is done.
            lock (_bufferLock)
            {
                _producerDone[_partitionIndex] = true;
                if (_consumerWaiting[_partitionIndex])
                {
                    Monitor.Pulse(_bufferLock);
                    _consumerWaiting[_partitionIndex] = false;
                }
            }
 
            // Call the base implementation.
            base.SpoolingFinally();
 
            // Dispose of the source enumerator *after* signaling that the task is done.
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
            _partition.Dispose();
        }
    }
}