File: Internal\QuicStreamContext.cs
Web Access
Project: src\src\Servers\Kestrel\Transport.Quic\src\Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.csproj (Microsoft.AspNetCore.Server.Kestrel.Transport.Quic)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Quic;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal;
 
internal partial class QuicStreamContext : TransportConnection, IPooledStream, IDisposable
{
    private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The QUIC transport's send loop completed gracefully.");
 
    // Internal for testing.
    internal Task _processingTask = Task.CompletedTask;
 
    private QuicStream? _stream;
    private readonly QuicConnectionContext _connection;
    private readonly QuicTransportContext _context;
    private readonly Pipe _inputPipe;
    private readonly Pipe _outputPipe;
    private readonly IDuplexPipe _originalTransport;
    private readonly IDuplexPipe _originalApplication;
    private readonly CompletionPipeReader _transportPipeReader;
    private readonly CompletionPipeWriter _transportPipeWriter;
    private readonly ILogger _log;
    private CancellationTokenSource? _streamClosedTokenSource;
    private string? _connectionId;
    private const int MinAllocBufferSize = 4096;
    private volatile Exception? _shutdownReadReason;
    private volatile Exception? _shutdownWriteReason;
    private volatile Exception? _shutdownReason;
    private volatile Exception? _writeAbortException;
    private bool _streamClosed;
    private bool _serverAborted;
    private bool _clientAbort;
    private readonly Lock _shutdownLock = new();
 
    public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext context)
    {
        _connection = connection;
        _context = context;
        _log = context.Log;
        MemoryPool = connection.MemoryPool;
        MultiplexedConnectionFeatures = connection.Features;
 
        RemoteEndPoint = connection.RemoteEndPoint;
        LocalEndPoint = connection.LocalEndPoint;
 
        var maxReadBufferSize = context.Options.MaxReadBufferSize ?? 0;
        var maxWriteBufferSize = context.Options.MaxWriteBufferSize ?? 0;
 
        // TODO should we allow these PipeScheduler to be configurable here?
        var inputOptions = new PipeOptions(MemoryPool, PipeScheduler.ThreadPool, PipeScheduler.Inline, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
        var outputOptions = new PipeOptions(MemoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
 
        _inputPipe = new Pipe(inputOptions);
        _outputPipe = new Pipe(outputOptions);
 
        _transportPipeReader = new CompletionPipeReader(_inputPipe.Reader);
        _transportPipeWriter = new CompletionPipeWriter(_outputPipe.Writer);
 
        _originalApplication = new DuplexPipe(_outputPipe.Reader, _inputPipe.Writer);
        _originalTransport = new DuplexPipe(_transportPipeReader, _transportPipeWriter);
    }
 
    public override MemoryPool<byte> MemoryPool { get; }
    private PipeWriter Input => Application.Output;
    private PipeReader Output => Application.Input;
 
    public bool CanReuse { get; private set; }
 
    public void Initialize(QuicStream stream)
    {
        Debug.Assert(_stream == null);
 
        _stream = stream;
 
        _streamClosedTokenSource = null;
        _onClosedRegistrations?.Clear();
 
        InitializeFeatures();
 
        CanRead = _stream.CanRead;
        CanWrite = _stream.CanWrite;
        _error = null;
        StreamId = _stream.Id;
        PoolExpirationTimestamp = 0;
 
        Transport = _originalTransport;
        Application = _originalApplication;
 
        _transportPipeReader.Reset();
        _transportPipeWriter.Reset();
 
        _connectionId = null;
        _shutdownReason = null;
        _writeAbortException = null;
        _streamClosed = false;
        _serverAborted = false;
        _clientAbort = false;
 
        // Only reset pipes if the stream has been reused.
        if (CanReuse)
        {
            _inputPipe.Reset();
            _outputPipe.Reset();
        }
 
        CanReuse = false;
    }
 
    public override CancellationToken ConnectionClosed
    {
        get
        {
            // Allocate CTS only if requested.
            if (_streamClosedTokenSource == null)
            {
                _streamClosedTokenSource = new CancellationTokenSource();
            }
            return _streamClosedTokenSource.Token;
        }
        set => throw new NotSupportedException();
    }
 
    public override string ConnectionId
    {
        get => _connectionId ??= StringUtilities.ConcatAsHexSuffix(_connection.ConnectionId, ':', (uint)StreamId);
        set => _connectionId = value;
    }
 
    public long PoolExpirationTimestamp { get; set; }
 
    public void Start()
    {
        Debug.Assert(_processingTask.IsCompletedSuccessfully);
 
        _processingTask = StartAsync();
    }
 
    private async Task StartAsync()
    {
        Debug.Assert(_stream != null);
 
        try
        {
            // Spawn send and receive logic
            // Streams may or may not have reading/writing, so only start tasks accordingly
            var receiveTask = ValueTask.CompletedTask;
            var sendTask = ValueTask.CompletedTask;
 
            if (_stream.CanRead)
            {
                receiveTask = DoReceiveAsync();
            }
 
            if (_stream.CanWrite)
            {
                sendTask = DoSendAsync();
            }
 
            // Now wait for both to complete
            await receiveTask;
            await sendTask;
 
            FireStreamClosed();
        }
        catch (Exception ex)
        {
            _log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(StartAsync)}.");
        }
    }
 
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
    private async ValueTask WaitForWritesClosedAsync()
    {
        Debug.Assert(_stream != null);
 
        try
        {
            await _stream.WritesClosed;
        }
        catch (Exception ex)
        {
            // Send error to DoSend loop.
            _writeAbortException = ex;
        }
        finally
        {
            Output.CancelPendingRead();
        }
    }
 
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
    private async ValueTask DoReceiveAsync()
    {
        Debug.Assert(_stream != null);
 
        Exception? error = null;
 
        try
        {
            var input = Input;
            while (true)
            {
                var buffer = input.GetMemory(MinAllocBufferSize);
                var bytesReceived = await _stream.ReadAsync(buffer);
 
                if (bytesReceived == 0)
                {
                    // Read completed.
                    break;
                }
 
                input.Advance(bytesReceived);
 
                ValueTask<FlushResult> flushTask;
 
                if (_stream.ReadsClosed.IsCompletedSuccessfully)
                {
                    // If the data returned from ReadAsync is the final chunk on the stream then
                    // flush data and end pipe together with CompleteAsync.
                    //
                    // Getting data and complete together is important for HTTP/3 when parsing headers.
                    // It is important that it knows that there is no body after the headers.
                    var completeTask = input.CompleteAsync();
                    if (completeTask.IsCompletedSuccessfully)
                    {
                        // Fast path. CompleteAsync completed immediately.
                        // Most implementations of ValueTask reset state in GetResult.
                        completeTask.GetAwaiter().GetResult();
 
                        flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true));
                    }
                    else
                    {
                        flushTask = AwaitCompleteTaskAsync(completeTask);
                    }
                }
                else
                {
                    flushTask = input.FlushAsync();
                }
 
                var paused = !flushTask.IsCompleted;
 
                if (paused)
                {
                    QuicLog.StreamPause(_log, this);
                }
 
                var result = await flushTask;
 
                if (paused)
                {
                    QuicLog.StreamResume(_log, this);
                }
 
                if (result.IsCompleted || result.IsCanceled)
                {
                    // Pipe consumer is shut down, do we stop writing
                    break;
                }
            }
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.StreamAborted or QuicError.ConnectionAborted)
        {
            // Abort from peer.
            _error = ex.ApplicationErrorCode; // Trust Quic to provide us a valid error code
            QuicLog.StreamAbortedRead(_log, this, ex.ApplicationErrorCode.GetValueOrDefault());
 
            // This could be ignored if _shutdownReason is already set.
            error = new ConnectionResetException(ex.Message, ex);
 
            _clientAbort = true;
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.ConnectionIdle)
        {
            // Abort from timeout.
            QuicLog.StreamTimeoutRead(_log, this);
 
            // This could be ignored if _shutdownReason is already set.
            error = new ConnectionResetException(ex.Message, ex);
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.OperationAborted)
        {
            // AbortRead has been called for the stream.
            error = new ConnectionAbortedException(ex.Message, ex);
        }
        catch (Exception ex)
        {
            // This is unexpected.
            error = ex;
            QuicLog.StreamError(_log, this, error);
        }
        finally
        {
            // If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
            Input.Complete(ResolveCompleteReceiveException(error));
        }
 
        async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTask)
        {
            await completeTask;
            return new FlushResult(isCanceled: false, isCompleted: true);
        }
    }
 
    private Exception? ResolveCompleteReceiveException(Exception? error)
    {
        return _shutdownReadReason ?? _shutdownReason ?? error;
    }
 
    private void FireStreamClosed()
    {
        // Guard against scheduling this multiple times
        lock (_shutdownLock)
        {
            if (_streamClosed)
            {
                return;
            }
 
            _streamClosed = true;
        }
 
        var onClosed = _onClosedRegistrations;
 
        if (onClosed != null)
        {
            foreach (var closeAction in onClosed)
            {
                closeAction.Callback(closeAction.State);
            }
        }
 
        if (_streamClosedTokenSource != null)
        {
            CancelConnectionClosedToken();
        }
    }
 
    private void CancelConnectionClosedToken()
    {
        Debug.Assert(_streamClosedTokenSource != null);
 
        try
        {
            _streamClosedTokenSource.Cancel();
        }
        catch (Exception ex)
        {
            _log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(CancelConnectionClosedToken)}.");
        }
    }
 
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
    private async ValueTask DoSendAsync()
    {
        Debug.Assert(_stream != null);
 
        Exception? shutdownReason = null;
        Exception? unexpectedError = null;
 
        // A client can abort a stream after it has finished sending data. We need a way to get that notification
        // which is why we listen for a notification that the write-side of the stream is done.
        // An exception can be thrown from the stream on client abort which will be captured and then wake up the output read.
        var waitForWritesClosedTask = WaitForWritesClosedAsync();
 
        try
        {
            // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
            var output = Output;
            while (true)
            {
                var result = await output.ReadAsync();
 
                if (result.IsCanceled)
                {
                    // WaitForWritesCompleted provides immediate notification that write-side of stream has completed.
                    // If the stream or connection is aborted then exception will be available to rethrow.
                    if (_writeAbortException != null)
                    {
                        ExceptionDispatchInfo.Throw(_writeAbortException);
                    }
 
                    break;
                }
 
                var buffer = result.Buffer;
 
                var end = buffer.End;
                var isCompleted = result.IsCompleted;
                if (!buffer.IsEmpty)
                {
                    if (buffer.IsSingleSegment)
                    {
                        // Fast path when the buffer is a single segment.
                        await _stream.WriteAsync(buffer.First, completeWrites: isCompleted);
                    }
                    else
                    {
                        // When then buffer has multiple segments then write them in a loop.
                        // We're not using a standard foreach here because we want to detect
                        // the final write and pass end stream flag with that write.
                        var enumerator = buffer.GetEnumerator();
                        var isLastSegment = !enumerator.MoveNext();
 
                        while (!isLastSegment)
                        {
                            var currentSegment = enumerator.Current;
                            isLastSegment = !enumerator.MoveNext();
                            await _stream.WriteAsync(currentSegment, completeWrites: isLastSegment && isCompleted);
                        }
                    }
                }
 
                output.AdvanceTo(end);
 
                if (isCompleted)
                {
                    // Once the stream pipe is closed, shutdown the stream.
                    break;
                }
            }
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.StreamAborted or QuicError.ConnectionAborted)
        {
            // Abort from peer.
            _error = ex.ApplicationErrorCode; // Trust Quic to provide us a valid error code
            QuicLog.StreamAbortedWrite(_log, this, ex.ApplicationErrorCode.GetValueOrDefault());
 
            // This could be ignored if _shutdownReason is already set.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
 
            _clientAbort = true;
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.ConnectionIdle)
        {
            // Abort from timeout.
            QuicLog.StreamTimeoutWrite(_log, this);
 
            // This could be ignored if _shutdownReason is already set.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
        }
        catch (QuicException ex) when (ex.QuicError is QuicError.OperationAborted)
        {
            // AbortWrite has been called for the stream.
            // Possibily might also get here from connection closing.
            // System.Net.Quic exception handling not finalized.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
        }
        catch (Exception ex)
        {
            shutdownReason = ex;
            unexpectedError = ex;
            QuicLog.StreamError(_log, this, unexpectedError);
        }
        finally
        {
            ShutdownWrite(shutdownReason);
 
            await waitForWritesClosedTask;
 
            // Complete the output after completing stream sends
            Output.Complete(unexpectedError);
 
            // Cancel any pending flushes so that the input loop is un-paused
            Input.CancelPendingFlush();
        }
    }
 
    public override void Abort(ConnectionAbortedException abortReason)
    {
        // Make local copy of reference to avoid possibility of race with stream being set to null in dispose.
        var stream = _stream;
 
        lock (_shutdownLock)
        {
            // Abort called after dispose. Stream is set to null in dispose.
            if (stream == null)
            {
                return;
            }
 
            // This abort is called twice, make sure that doesn't happen.
            // Don't call _stream.Shutdown and _stream.Abort at the same time.
            if (_serverAborted)
            {
                return;
            }
 
            _serverAborted = true;
            _shutdownReason = abortReason;
        }
 
        var resolvedErrorCode = _error ?? 0; // _error is validated on assignment
        QuicLog.StreamAbort(_log, this, resolvedErrorCode, abortReason.Message);
 
        if (stream.CanRead)
        {
            stream.Abort(QuicAbortDirection.Read, resolvedErrorCode);
        }
        if (stream.CanWrite)
        {
            stream.Abort(QuicAbortDirection.Write, resolvedErrorCode);
        }
 
        // Cancel ProcessSends loop after calling shutdown to ensure the correct _shutdownReason gets set.
        Output.CancelPendingRead();
    }
 
    private void ShutdownWrite(Exception? shutdownReason)
    {
        Debug.Assert(_stream != null);
 
        try
        {
            lock (_shutdownLock)
            {
                _shutdownReason = _shutdownWriteReason ?? _shutdownReason ?? shutdownReason ?? SendGracefullyCompletedException;
                QuicLog.StreamShutdownWrite(_log, this, _shutdownReason.Message);
 
                // Only complete writes for a graceful shutdown.
                if (_shutdownReason == SendGracefullyCompletedException)
                {
                    _stream.CompleteWrites();
                }
            }
        }
        catch (Exception ex)
        {
            _log.LogWarning(ex, "Stream failed to gracefully shutdown.");
            // Ignore any errors from Shutdown() since we're tearing down the stream anyway.
        }
    }
 
    [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
    public override async ValueTask DisposeAsync()
    {
        if (_stream == null)
        {
            return;
        }
 
        _originalTransport.Input.Complete();
        _originalTransport.Output.Complete();
 
        await _processingTask;
 
        await _stream.DisposeAsync();
 
        lock (_shutdownLock)
        {
            // CanReuse must not be calculated while draining stream. It is possible for
            // an abort to be received while any methods are still running.
            // It is safe to calculate CanReuse after processing is completed.
            //
            // Be conservative about what can be pooled.
            // Only pool bidirectional streams whose pipes have completed successfully and haven't been aborted.
            CanReuse = CanRead && CanWrite
                && _transportPipeReader.IsCompletedSuccessfully
                && _transportPipeWriter.IsCompletedSuccessfully
                && !_clientAbort
                && !_serverAborted
                && _shutdownReadReason == null
                && _shutdownWriteReason == null;
 
            if (!CanReuse)
            {
                DisposeCore();
            }
 
            // QuicStream can't be reused. Don't hang onto it when QuicStreamContext it potentially cached.
            _stream = null!;
        }
    }
 
    public void Dispose()
    {
        if (!_connection.TryReturnStream(this))
        {
            // Dispose when one of:
            // - Stream is not bidirection
            // - Stream didn't complete gracefully
            // - Pool is full
            DisposeCore();
        }
    }
 
    // Called when the stream is no longer reused.
    public void DisposeCore()
    {
        _streamClosedTokenSource?.Dispose();
    }
}