File: System\IO\Pipelines\Pipe.cs
Web Access
Project: src\src\libraries\System.IO.Pipelines\src\System.IO.Pipelines.csproj (System.IO.Pipelines)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
 
namespace System.IO.Pipelines
{
    /// <summary>The default <see cref="System.IO.Pipelines.PipeWriter" /> and <see cref="System.IO.Pipelines.PipeReader" /> implementation.</summary>
    public sealed partial class Pipe
    {
        private static readonly Action<object?> s_signalReaderAwaitable = state => ((Pipe)state!).ReaderCancellationRequested();
        private static readonly Action<object?> s_signalWriterAwaitable = state => ((Pipe)state!).WriterCancellationRequested();
        private static readonly Action<object?> s_invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state!).Execute();
 
        // These callbacks all point to the same methods but are different delegate types
        private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext!;
        private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext!;
        private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext!;
        private static readonly Action<object?> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext!;
 
        // Mutable struct! Don't make this readonly
        private BufferSegmentStack _bufferSegmentPool;
 
        private readonly DefaultPipeReader _reader;
        private readonly DefaultPipeWriter _writer;
 
        // The options instance
        private readonly PipeOptions _options;
        private readonly object _sync = new object();
 
        // Computed state from the options instance
        private bool UseSynchronizationContext => _options.UseSynchronizationContext;
        private int MinimumSegmentSize => _options.MinimumSegmentSize;
        private long PauseWriterThreshold => _options.PauseWriterThreshold;
        private long ResumeWriterThreshold => _options.ResumeWriterThreshold;
 
        private PipeScheduler ReaderScheduler => _options.ReaderScheduler;
        private PipeScheduler WriterScheduler => _options.WriterScheduler;
 
        // This sync objects protects the shared state between the writer and reader (most of this class)
        private object SyncObj => _sync;
 
        // The number of bytes flushed but not consumed by the reader
        private long _unconsumedBytes;
 
        // The number of bytes written but not flushed
        private long _unflushedBytes;
 
        private PipeAwaitable _readerAwaitable;
        private PipeAwaitable _writerAwaitable;
 
        private PipeCompletion _writerCompletion;
        private PipeCompletion _readerCompletion;
 
        // Stores the last examined position, used to calculate how many bytes were to release
        // for back pressure management
        private long _lastExaminedIndex = -1;
 
        // The read head which is the start of the PipeReader's consumed bytes
        private BufferSegment? _readHead;
        private int _readHeadIndex;
 
        private bool _disposed;
 
        // The extent of the bytes available to the PipeReader to consume
        private BufferSegment? _readTail;
        private int _readTailIndex;
        private int _minimumReadBytes;
 
        // The write head which is the extent of the PipeWriter's written bytes
        private BufferSegment? _writingHead;
        private Memory<byte> _writingHeadMemory;
        private int _writingHeadBytesBuffered;
 
        // Determines what current operation is in flight (reading/writing)
        private PipeOperationState _operationState;
 
        internal long Length => _unconsumedBytes;
 
        /// <summary>Initializes a new instance of the <see cref="System.IO.Pipelines.Pipe" /> class using <see cref="System.IO.Pipelines.PipeOptions.Default" /> as options.</summary>
        public Pipe() : this(PipeOptions.Default)
        {
        }
 
        /// <summary>Initializes a new instance of the <see cref="System.IO.Pipelines.Pipe" /> class with the specified options.</summary>
        /// <param name="options">The set of options for this pipe.</param>
        public Pipe(PipeOptions options)
        {
            if (options == null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
            }
 
            _bufferSegmentPool = new BufferSegmentStack(options.InitialSegmentPoolSize);
 
            _operationState = default;
            _readerCompletion = default;
            _writerCompletion = default;
 
            _options = options;
            _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext);
            _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext);
            _reader = new DefaultPipeReader(this);
            _writer = new DefaultPipeWriter(this);
        }
 
        private void ResetState()
        {
            _readerCompletion.Reset();
            _writerCompletion.Reset();
            _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext);
            _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext);
            _readTailIndex = 0;
            _readHeadIndex = 0;
            _lastExaminedIndex = -1;
            _unflushedBytes = 0;
            _unconsumedBytes = 0;
        }
 
        internal Memory<byte> GetMemory(int sizeHint)
        {
            if (_writerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
            }
 
            if (sizeHint < 0)
            {
                ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
            }
 
            AllocateWriteHeadIfNeeded(sizeHint);
 
            return _writingHeadMemory;
        }
 
        internal Span<byte> GetSpan(int sizeHint)
        {
            if (_writerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
            }
 
            if (sizeHint < 0)
            {
                ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
            }
 
            AllocateWriteHeadIfNeeded(sizeHint);
 
            return _writingHeadMemory.Span;
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private void AllocateWriteHeadIfNeeded(int sizeHint)
        {
            // If writing is currently active and enough space, don't need to take the lock to just set WritingActive.
            // IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written.
            if (!_operationState.IsWritingActive ||
                _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
            {
                AllocateWriteHeadSynchronized(sizeHint);
            }
        }
 
        private void AllocateWriteHeadSynchronized(int sizeHint)
        {
            lock (SyncObj)
            {
                _operationState.BeginWrite();
 
                if (_writingHead == null)
                {
                    // We need to allocate memory to write since nobody has written before
                    BufferSegment newSegment = AllocateSegment(sizeHint);
 
                    // Set all the pointers
                    _writingHead = _readHead = _readTail = newSegment;
                    _lastExaminedIndex = 0;
                }
                else
                {
                    int bytesLeftInBuffer = _writingHeadMemory.Length;
 
                    if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
                    {
                        if (_writingHeadBytesBuffered > 0)
                        {
                            // Flush buffered data to the segment
                            _writingHead.End += _writingHeadBytesBuffered;
                            _writingHeadBytesBuffered = 0;
                        }
 
                        if (_writingHead.Length == 0)
                        {
                            // If we got here that means Advance was called with 0 bytes or GetMemory was called again without any writes occurring
                            // And, the newly requested memory size is greater than our unused segments internal memory buffer
                            // So we should reuse the BufferSegment and replace the memory it's holding, this way ReadAsync will not receive a buffer with one segment being empty
                            _writingHead.ResetMemory();
                            RentMemory(_writingHead, sizeHint);
                        }
                        else
                        {
                            BufferSegment newSegment = AllocateSegment(sizeHint);
 
                            _writingHead.SetNext(newSegment);
                            _writingHead = newSegment;
                        }
                    }
                }
            }
        }
 
        private BufferSegment AllocateSegment(int sizeHint)
        {
            BufferSegment newSegment = CreateSegmentUnsynchronized();
 
            RentMemory(newSegment, sizeHint);
 
            return newSegment;
        }
 
        private void RentMemory(BufferSegment segment, int sizeHint)
        {
            // Segment should be new or reset, otherwise a memory leak could occur
            Debug.Assert(segment.MemoryOwner is null);
            Debug.Assert(sizeHint >= 0);
 
            MemoryPool<byte>? pool = null;
            int maxSize = -1;
 
            if (!_options.IsDefaultSharedMemoryPool)
            {
                pool = _options.Pool;
                maxSize = pool.MaxBufferSize;
            }
 
            if (sizeHint <= maxSize)
            {
                // Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null.
                segment.SetOwnedMemory(pool!.Rent(GetSegmentSize(sizeHint, maxSize)));
            }
            else
            {
                // Use the array pool
                int sizeToRequest = GetSegmentSize(sizeHint);
                segment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
            }
 
            _writingHeadMemory = segment.AvailableMemory;
        }
 
        private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
        {
            // First we need to handle case where hint is smaller than minimum segment size
            sizeHint = Math.Max(MinimumSegmentSize, sizeHint);
            // After that adjust it to fit into pools max buffer size
            int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
            return adjustedToMaximumSize;
        }
 
        private BufferSegment CreateSegmentUnsynchronized()
        {
            if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
            {
                return segment;
            }
 
            return new BufferSegment();
        }
 
        private void ReturnSegmentUnsynchronized(BufferSegment segment)
        {
            Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!");
            Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
            Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
 
            if (_bufferSegmentPool.Count < _options.MaxSegmentPoolSize)
            {
                _bufferSegmentPool.Push(segment);
            }
        }
 
        internal bool CommitUnsynchronized()
        {
            _operationState.EndWrite();
 
            if (_unflushedBytes == 0)
            {
                // Nothing written to commit
                return false;
            }
 
            // Update the writing head
            Debug.Assert(_writingHead != null);
            _writingHead.End += _writingHeadBytesBuffered;
 
            // Always move the read tail to the write head
            _readTail = _writingHead;
            _readTailIndex = _writingHead.End;
 
            long oldLength = _unconsumedBytes;
            _unconsumedBytes += _unflushedBytes;
 
            bool resumeReader = true;
 
            if (_unconsumedBytes < _minimumReadBytes)
            {
                // Don't yield the reader if we haven't written enough
                resumeReader = false;
            }
            // We only apply back pressure if the reader isn't paused. This is important
            // because if it is blocked then this could cause a deadlock (if resumeReader is false).
            // If we are resuming the reader, then we can look at the pause threshold to know
            // if we should pause the writer.
            else if (PauseWriterThreshold > 0 &&
                oldLength < PauseWriterThreshold &&
                _unconsumedBytes >= PauseWriterThreshold &&
                !_readerCompletion.IsCompleted)
            {
                _writerAwaitable.SetUncompleted();
            }
 
            _unflushedBytes = 0;
            _writingHeadBytesBuffered = 0;
 
            return resumeReader;
        }
 
        internal void Advance(int bytes)
        {
            lock (SyncObj)
            {
                if ((uint)bytes > (uint)_writingHeadMemory.Length)
                {
                    ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
                }
 
                // If the reader is completed we no-op Advance but leave GetMemory and FlushAsync alone
                if (_readerCompletion.IsCompleted)
                {
                    return;
                }
 
                AdvanceCore(bytes);
            }
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private void AdvanceCore(int bytesWritten)
        {
            _unflushedBytes += bytesWritten;
            _writingHeadBytesBuffered += bytesWritten;
            _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
        }
 
        internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
            }
 
            CompletionData completionData;
            ValueTask<FlushResult> result;
            lock (SyncObj)
            {
                PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
            }
 
            TrySchedule(ReaderScheduler, completionData);
 
            return result;
        }
 
        private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
        {
            var completeReader = CommitUnsynchronized();
 
            // AttachToken before completing reader awaiter in case cancellationToken is already completed
            _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
 
            // If the writer is completed (which it will be most of the time) then return a completed ValueTask
            if (_writerAwaitable.IsCompleted)
            {
                FlushResult flushResult = default;
                GetFlushResult(ref flushResult);
                result = new ValueTask<FlushResult>(flushResult);
            }
            else
            {
                // Otherwise it's async
                result = new ValueTask<FlushResult>(_writer, token: 0);
            }
 
            // Complete reader only if new data was pushed into the pipe
            // Avoid throwing in between completing the reader and scheduling the callback
            // if the intent is to allow pipe to continue reading the data
            if (completeReader)
            {
                _readerAwaitable.Complete(out completionData);
            }
            else
            {
                completionData = default;
            }
 
            // I couldn't find a way for flush to induce backpressure deadlock
            // if it always adds new data to pipe and wakes up the reader but assert anyway
            Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
        }
 
        internal void CompleteWriter(Exception? exception)
        {
            CompletionData completionData;
            PipeCompletionCallbacks? completionCallbacks;
            bool readerCompleted;
 
            lock (SyncObj)
            {
                // Commit any pending buffers
                CommitUnsynchronized();
 
                completionCallbacks = _writerCompletion.TryComplete(exception);
                _readerAwaitable.Complete(out completionData);
                readerCompleted = _readerCompletion.IsCompleted;
            }
 
            if (readerCompleted)
            {
                CompletePipe();
            }
 
            if (completionCallbacks != null)
            {
                ScheduleCallbacks(ReaderScheduler, completionCallbacks);
            }
 
            TrySchedule(ReaderScheduler, completionData);
        }
 
        internal void AdvanceReader(in SequencePosition consumed)
        {
            AdvanceReader(consumed, consumed);
        }
 
        internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined)
        {
            // If the reader is completed
            if (_readerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
            }
 
            AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
        }
 
        private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, BufferSegment? examinedSegment, int examinedIndex)
        {
            // Throw if examined < consumed
            if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
            {
                ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
            }
 
            BufferSegment? returnStart = null;
            BufferSegment? returnEnd = null;
 
            CompletionData completionData = default;
 
            lock (SyncObj)
            {
                var examinedEverything = false;
                if (examinedSegment == _readTail)
                {
                    examinedEverything = examinedIndex == _readTailIndex;
                }
 
                if (examinedSegment != null && _lastExaminedIndex >= 0)
                {
                    // This can be negative resulting in _unconsumedBytes increasing, this should be safe because we've already checked that
                    // examined >= consumed above, so we can't get into a state where we un-examine too much
                    long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
                    long oldLength = _unconsumedBytes;
 
                    _unconsumedBytes -= examinedBytes;
 
                    // Store the absolute position
                    _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
 
                    Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative");
                    Debug.Assert(ResumeWriterThreshold >= 1, "ResumeWriterThreshold is less than 1");
 
                    if (oldLength >= ResumeWriterThreshold &&
                        _unconsumedBytes < ResumeWriterThreshold)
                    {
                        // Should only release backpressure if we made forward progress
                        Debug.Assert(examinedBytes > 0);
                        _writerAwaitable.Complete(out completionData);
                    }
                }
 
                if (consumedSegment != null)
                {
                    if (_readHead == null)
                    {
                        ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
                        return;
                    }
 
                    returnStart = _readHead;
                    returnEnd = consumedSegment;
 
                    void MoveReturnEndToNextBlock()
                    {
                        BufferSegment? nextBlock = returnEnd!.NextSegment;
                        if (_readTail == returnEnd)
                        {
                            _readTail = nextBlock;
                            _readTailIndex = 0;
                        }
 
                        _readHead = nextBlock;
                        _readHeadIndex = 0;
 
                        returnEnd = nextBlock;
                    }
 
                    if (consumedIndex == returnEnd.Length)
                    {
                        // If the writing head isn't block we're about to return, then we can move to the next one
                        // and return this block safely
                        if (_writingHead != returnEnd)
                        {
                            MoveReturnEndToNextBlock();
                        }
                        // If the writing head is the same as the block to be returned, then we need to make sure
                        // there's no pending write and that there's no buffered data for the writing head
                        else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
                        {
                            // Reset the writing head to null if it's the return block and we've consumed everything
                            _writingHead = null;
                            _writingHeadMemory = default;
 
                            MoveReturnEndToNextBlock();
                        }
                        else
                        {
                            _readHead = consumedSegment;
                            _readHeadIndex = consumedIndex;
                        }
                    }
                    else
                    {
                        _readHead = consumedSegment;
                        _readHeadIndex = consumedIndex;
                    }
                }
 
                // We reset the awaitable to not completed if we've examined everything the producer produced so far
                // but only if writer is not completed yet
                if (examinedEverything && !_writerCompletion.IsCompleted)
                {
                    Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync isn't completed and will deadlock");
 
                    _readerAwaitable.SetUncompleted();
                }
 
                while (returnStart != null && returnStart != returnEnd)
                {
                    BufferSegment? next = returnStart.NextSegment;
                    returnStart.Reset();
                    ReturnSegmentUnsynchronized(returnStart);
                    returnStart = next;
                }
 
                _operationState.EndRead();
            }
 
            TrySchedule(WriterScheduler, completionData);
        }
 
        internal void CompleteReader(Exception? exception)
        {
            PipeCompletionCallbacks? completionCallbacks;
            CompletionData completionData;
            bool writerCompleted;
 
            lock (SyncObj)
            {
                // If we're reading, treat clean up that state before continuting
                if (_operationState.IsReadingActive)
                {
                    _operationState.EndRead();
                }
 
                // REVIEW: We should consider cleaning up all of the allocated memory
                // on the reader side now.
 
                completionCallbacks = _readerCompletion.TryComplete(exception);
                _writerAwaitable.Complete(out completionData);
                writerCompleted = _writerCompletion.IsCompleted;
            }
 
            if (writerCompleted)
            {
                CompletePipe();
            }
 
            if (completionCallbacks != null)
            {
                ScheduleCallbacks(WriterScheduler, completionCallbacks);
            }
 
            TrySchedule(WriterScheduler, completionData);
        }
 
        internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
        {
            if (callback is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
            }
 
            PipeCompletionCallbacks? completionCallbacks;
            lock (SyncObj)
            {
                completionCallbacks = _writerCompletion.AddCallback(callback, state);
            }
 
            if (completionCallbacks != null)
            {
                ScheduleCallbacks(ReaderScheduler, completionCallbacks);
            }
        }
 
        internal void CancelPendingRead()
        {
            CompletionData completionData;
            lock (SyncObj)
            {
                _readerAwaitable.Cancel(out completionData);
            }
            TrySchedule(ReaderScheduler, completionData);
        }
 
        internal void CancelPendingFlush()
        {
            CompletionData completionData;
            lock (SyncObj)
            {
                _writerAwaitable.Cancel(out completionData);
            }
            TrySchedule(WriterScheduler, completionData);
        }
 
        internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
        {
            if (callback is null)
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
            }
 
            PipeCompletionCallbacks? completionCallbacks;
            lock (SyncObj)
            {
                completionCallbacks = _readerCompletion.AddCallback(callback, state);
            }
 
            if (completionCallbacks != null)
            {
                ScheduleCallbacks(WriterScheduler, completionCallbacks);
            }
        }
 
        internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token)
        {
            if (_readerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
            }
 
            if (token.IsCancellationRequested)
            {
                return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
            }
 
            CompletionData completionData = default;
            ValueTask<ReadResult> result;
            lock (SyncObj)
            {
                _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
 
                // If the awaitable is already complete then return the value result directly
                if (_readerAwaitable.IsCompleted)
                {
                    GetReadResult(out ReadResult readResult);
 
                    // Short circuit if we have the data or if we enter another terminal state
                    if (_unconsumedBytes >= minimumBytes || readResult.IsCanceled || readResult.IsCompleted)
                    {
                        return new ValueTask<ReadResult>(readResult);
                    }
 
                    // We don't have enough data so we need to reset the reader awaitable
                    _readerAwaitable.SetUncompleted();
 
                    // We also need to flip the reading state off
                    _operationState.EndRead();
 
                    // Begin read again to wire up cancellation token
                    _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
                }
 
                // If the writer is currently paused and we are about the wait for more data then this would deadlock.
                // The writer is paused at the pause threshold but the reader needs a minimum amount in order to make progress.
                // We resume the writer so that we can unblock this read.
                if (!_writerAwaitable.IsCompleted)
                {
                    _writerAwaitable.Complete(out completionData);
                }
 
                // Set the minimum read bytes if we need to wait
                _minimumReadBytes = minimumBytes;
 
                // Otherwise it's async
                result = new ValueTask<ReadResult>(_reader, token: 0);
            }
 
            TrySchedule(WriterScheduler, in completionData);
 
            return result;
        }
 
        internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
        {
            if (_readerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
            }
 
            if (token.IsCancellationRequested)
            {
                return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
            }
 
            ValueTask<ReadResult> result;
            lock (SyncObj)
            {
                _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
 
                // If the awaitable is already complete then return the value result directly
                if (_readerAwaitable.IsCompleted)
                {
                    GetReadResult(out ReadResult readResult);
                    result = new ValueTask<ReadResult>(readResult);
                }
                else
                {
                    // Otherwise it's async
                    result = new ValueTask<ReadResult>(_reader, token: 0);
                }
            }
 
            return result;
        }
 
        internal bool TryRead(out ReadResult result)
        {
            lock (SyncObj)
            {
                if (_readerCompletion.IsCompleted)
                {
                    ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
                }
 
                if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted)
                {
                    GetReadResult(out result);
                    return true;
                }
 
                if (_readerAwaitable.IsRunning)
                {
                    ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
                }
 
                _operationState.BeginReadTentative();
                result = default;
                return false;
            }
        }
 
        private static void ScheduleCallbacks(PipeScheduler scheduler, PipeCompletionCallbacks completionCallbacks)
        {
            Debug.Assert(completionCallbacks != null);
 
            scheduler.UnsafeSchedule(s_invokeCompletionCallbacks, completionCallbacks);
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData)
        {
            Action<object?> completion = completionData.Completion;
            // Nothing to do
            if (completion is null)
            {
                return;
            }
 
            // Ultimately, we need to call either
            // 1. The sync context with a delegate
            // 2. The scheduler with a delegate
            // That delegate and state will either be the action passed in directly
            // or it will be that specified delegate wrapped in ExecutionContext.Run
            if (completionData.SynchronizationContext is null && completionData.ExecutionContext is null)
            {
                // Common fast-path
                scheduler.UnsafeSchedule(completion, completionData.CompletionState);
            }
            else
            {
                ScheduleWithContext(scheduler, in completionData);
            }
        }
 
        [MethodImpl(MethodImplOptions.NoInlining)]
        private static void ScheduleWithContext(PipeScheduler scheduler, in CompletionData completionData)
        {
            Debug.Assert(completionData.SynchronizationContext != null || completionData.ExecutionContext != null);
 
            if (completionData.SynchronizationContext is null)
            {
                // We also have to run on the specified execution context so run the scheduler and execute the
                // delegate on the execution context
                scheduler.UnsafeSchedule(s_scheduleWithExecutionContextCallback, completionData);
            }
            else
            {
                if (completionData.ExecutionContext is null)
                {
                    // We need to box the struct here since there's no generic overload for state
                    completionData.SynchronizationContext.Post(s_syncContextExecuteWithoutExecutionContextCallback, completionData);
                }
                else
                {
                    // We need to execute the callback with the execution context
                    completionData.SynchronizationContext.Post(s_syncContextExecutionContextCallback, completionData);
                }
            }
        }
 
        private static void ExecuteWithoutExecutionContext(object state)
        {
            CompletionData completionData = (CompletionData)state;
            completionData.Completion(completionData.CompletionState);
        }
 
        private static void ExecuteWithExecutionContext(object state)
        {
            CompletionData completionData = (CompletionData)state;
            Debug.Assert(completionData.ExecutionContext != null);
            ExecutionContext.Run(completionData.ExecutionContext, s_executionContextRawCallback, state);
        }
 
        private void CompletePipe()
        {
            lock (SyncObj)
            {
                if (_disposed)
                {
                    return;
                }
 
                _disposed = true;
                // Return all segments
                // if _readHead is null we need to try return _commitHead
                // because there might be a block allocated for writing
                BufferSegment? segment = _readHead ?? _readTail;
                while (segment != null)
                {
                    BufferSegment returnSegment = segment;
                    segment = segment.NextSegment;
 
                    returnSegment.Reset();
                }
 
                _writingHead = null;
                _writingHeadMemory = default;
                _readHead = null;
                _readTail = null;
                _lastExaminedIndex = -1;
            }
        }
 
        internal ValueTaskSourceStatus GetReadAsyncStatus()
        {
            if (_readerAwaitable.IsCompleted)
            {
                if (_writerCompletion.IsFaulted)
                {
                    return ValueTaskSourceStatus.Faulted;
                }
 
                return ValueTaskSourceStatus.Succeeded;
            }
            return ValueTaskSourceStatus.Pending;
        }
 
        internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
        {
            CompletionData completionData;
            bool doubleCompletion;
            lock (SyncObj)
            {
                _readerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
            }
            if (doubleCompletion)
            {
                Writer.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
            }
            TrySchedule(ReaderScheduler, completionData);
        }
 
        internal ReadResult GetReadAsyncResult()
        {
            ReadResult result;
            CancellationTokenRegistration cancellationTokenRegistration = default;
            CancellationToken cancellationToken = default;
            try
            {
                lock (SyncObj)
                {
                    if (!_readerAwaitable.IsCompleted)
                    {
                        ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
                    }
 
                    cancellationTokenRegistration = _readerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
                    GetReadResult(out result);
                }
            }
            finally
            {
                cancellationTokenRegistration.Dispose();
            }
 
            if (result.IsCanceled)
            {
                cancellationToken.ThrowIfCancellationRequested();
            }
 
            return result;
        }
 
        private void GetReadResult(out ReadResult result)
        {
            bool isCompleted = _writerCompletion.IsCompletedOrThrow();
            bool isCanceled = _readerAwaitable.ObserveCancellation();
 
            // No need to read end if there is no head
            BufferSegment? head = _readHead;
            if (head != null)
            {
                Debug.Assert(_readTail != null);
                // Reading commit head shared with writer
                var readOnlySequence = new ReadOnlySequence<byte>(head, _readHeadIndex, _readTail, _readTailIndex);
                result = new ReadResult(readOnlySequence, isCanceled, isCompleted);
            }
            else
            {
                result = new ReadResult(default, isCanceled, isCompleted);
            }
 
            if (isCanceled)
            {
                _operationState.BeginReadTentative();
            }
            else
            {
                _operationState.BeginRead();
            }
 
            // Reset the minimum read bytes when read yields
            _minimumReadBytes = 0;
        }
 
        internal ValueTaskSourceStatus GetFlushAsyncStatus()
        {
            if (_writerAwaitable.IsCompleted)
            {
                if (_readerCompletion.IsFaulted)
                {
                    return ValueTaskSourceStatus.Faulted;
                }
 
                return ValueTaskSourceStatus.Succeeded;
            }
            return ValueTaskSourceStatus.Pending;
        }
 
        internal FlushResult GetFlushAsyncResult()
        {
            FlushResult result = default;
            CancellationToken cancellationToken = default;
            CancellationTokenRegistration cancellationTokenRegistration = default;
 
            try
            {
                lock (SyncObj)
                {
                    if (!_writerAwaitable.IsCompleted)
                    {
                        ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
                    }
 
                    GetFlushResult(ref result);
 
                    cancellationTokenRegistration = _writerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
                }
            }
            finally
            {
                cancellationTokenRegistration.Dispose();
                cancellationToken.ThrowIfCancellationRequested();
            }
 
            return result;
        }
 
        internal long GetUnflushedBytes() => _unflushedBytes;
 
        private void GetFlushResult(ref FlushResult result)
        {
            // Change the state from to be canceled -> observed
            if (_writerAwaitable.ObserveCancellation())
            {
                result._resultFlags |= ResultFlags.Canceled;
            }
            if (_readerCompletion.IsCompletedOrThrow())
            {
                result._resultFlags |= ResultFlags.Completed;
            }
        }
 
        internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
        {
            if (_writerCompletion.IsCompleted)
            {
                ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
            }
 
            if (_readerCompletion.IsCompletedOrThrow())
            {
                return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
            }
 
            if (cancellationToken.IsCancellationRequested)
            {
                return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
            }
 
            CompletionData completionData;
            ValueTask<FlushResult> result;
 
            lock (SyncObj)
            {
                // Allocate whatever the pool gives us so we can write, this also marks the
                // state as writing
                AllocateWriteHeadIfNeeded(0);
 
                if (source.Length <= _writingHeadMemory.Length)
                {
                    source.CopyTo(_writingHeadMemory);
 
                    AdvanceCore(source.Length);
                }
                else
                {
                    // This is the multi segment copy
                    WriteMultiSegment(source.Span);
                }
 
                PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
            }
 
            TrySchedule(ReaderScheduler, completionData);
            return result;
        }
 
        private void WriteMultiSegment(ReadOnlySpan<byte> source)
        {
            Debug.Assert(_writingHead != null);
            Span<byte> destination = _writingHeadMemory.Span;
 
            while (true)
            {
                int writable = Math.Min(destination.Length, source.Length);
                source.Slice(0, writable).CopyTo(destination);
                source = source.Slice(writable);
                AdvanceCore(writable);
 
                if (source.Length == 0)
                {
                    break;
                }
 
                // We filled the segment
                _writingHead.End += _writingHeadBytesBuffered;
                _writingHeadBytesBuffered = 0;
 
                // This is optimized to use pooled memory. That's why we pass 0 instead of
                // source.Length
                BufferSegment newSegment = AllocateSegment(0);
 
                _writingHead.SetNext(newSegment);
                _writingHead = newSegment;
 
                destination = _writingHeadMemory.Span;
            }
        }
 
        internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
        {
            CompletionData completionData;
            bool doubleCompletion;
            lock (SyncObj)
            {
                _writerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
            }
            if (doubleCompletion)
            {
                Reader.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
            }
            TrySchedule(WriterScheduler, completionData);
        }
 
        private void ReaderCancellationRequested()
        {
            CompletionData completionData;
            lock (SyncObj)
            {
                _readerAwaitable.CancellationTokenFired(out completionData);
            }
            TrySchedule(ReaderScheduler, completionData);
        }
 
        private void WriterCancellationRequested()
        {
            CompletionData completionData;
            lock (SyncObj)
            {
                _writerAwaitable.CancellationTokenFired(out completionData);
            }
            TrySchedule(WriterScheduler, completionData);
        }
 
        /// <summary>Gets the <see cref="System.IO.Pipelines.PipeReader" /> for this pipe.</summary>
        /// <value>A <see cref="System.IO.Pipelines.PipeReader" /> instance for this pipe.</value>
        public PipeReader Reader => _reader;
 
        /// <summary>Gets the <see cref="System.IO.Pipelines.PipeWriter" /> for this pipe.</summary>
        /// <value>A <see cref="System.IO.Pipelines.PipeWriter" /> instance for this pipe.</value>
        public PipeWriter Writer => _writer;
 
        /// <summary>Resets the pipe.</summary>
        public void Reset()
        {
            lock (SyncObj)
            {
                if (!_disposed)
                {
                    ThrowHelper.ThrowInvalidOperationException_ResetIncompleteReaderWriter();
                }
 
                _disposed = false;
                ResetState();
            }
        }
    }
}