// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Quic;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal;
internal partial class QuicStreamContext : TransportConnection, IPooledStream, IDisposable
    private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The QUIC transport's send loop completed gracefully.");
    // Internal for testing.
    internal Task _processingTask = Task.CompletedTask;
    private QuicStream? _stream;
    private readonly QuicConnectionContext _connection;
    private readonly QuicTransportContext _context;
    private readonly Pipe _inputPipe;
    private readonly Pipe _outputPipe;
    private readonly IDuplexPipe _originalTransport;
    private readonly IDuplexPipe _originalApplication;
    private readonly CompletionPipeReader _transportPipeReader;
    private readonly CompletionPipeWriter _transportPipeWriter;
    private readonly ILogger _log;
    private CancellationTokenSource? _streamClosedTokenSource;
    private string? _connectionId;
    private const int MinAllocBufferSize = 4096;
    private volatile Exception? _shutdownReadReason;
    private volatile Exception? _shutdownWriteReason;
    private volatile Exception? _shutdownReason;
    private volatile Exception? _writeAbortException;
    private bool _streamClosed;
    private bool _serverAborted;
    private bool _clientAbort;
    private readonly Lock _shutdownLock = new();
    public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext context)
        _connection = connection;
        _context = context;
        _log = context.Log;
        MemoryPool = connection.MemoryPool;
        MultiplexedConnectionFeatures = connection.Features;
        RemoteEndPoint = connection.RemoteEndPoint;
        LocalEndPoint = connection.LocalEndPoint;
        var maxReadBufferSize = context.Options.MaxReadBufferSize ?? 0;
        var maxWriteBufferSize = context.Options.MaxWriteBufferSize ?? 0;
        // TODO should we allow these PipeScheduler to be configurable here?
        var inputOptions = new PipeOptions(MemoryPool, PipeScheduler.ThreadPool, PipeScheduler.Inline, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
        var outputOptions = new PipeOptions(MemoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
        _inputPipe = new Pipe(inputOptions);
        _outputPipe = new Pipe(outputOptions);
        _transportPipeReader = new CompletionPipeReader(_inputPipe.Reader);
        _transportPipeWriter = new CompletionPipeWriter(_outputPipe.Writer);
        _originalApplication = new DuplexPipe(_outputPipe.Reader, _inputPipe.Writer);
        _originalTransport = new DuplexPipe(_transportPipeReader, _transportPipeWriter);
    public override MemoryPool<byte> MemoryPool { get; }
    private PipeWriter Input => Application.Output;
    private PipeReader Output => Application.Input;
    public bool CanReuse { get; private set; }
    public void Initialize(QuicStream stream)
        Debug.Assert(_stream == null);
        _stream = stream;
        _streamClosedTokenSource = null;
        CanRead = _stream.CanRead;
        CanWrite = _stream.CanWrite;
        _error = null;
        StreamId = _stream.Id;
        PoolExpirationTimestamp = 0;
        Transport = _originalTransport;
        Application = _originalApplication;
        _connectionId = null;
        _shutdownReason = null;
        _writeAbortException = null;
        _streamClosed = false;
        _serverAborted = false;
        _clientAbort = false;
        // Only reset pipes if the stream has been reused.
        if (CanReuse)
        CanReuse = false;
    public override CancellationToken ConnectionClosed
            // Allocate CTS only if requested.
            if (_streamClosedTokenSource == null)
                _streamClosedTokenSource = new CancellationTokenSource();
            return _streamClosedTokenSource.Token;
        set => throw new NotSupportedException();
    public override string ConnectionId
        get => _connectionId ??= StringUtilities.ConcatAsHexSuffix(_connection.ConnectionId, ':', (uint)StreamId);
        set => _connectionId = value;
    public long PoolExpirationTimestamp { get; set; }
    public void Start()
        _processingTask = StartAsync();
    private async Task StartAsync()
        Debug.Assert(_stream != null);
            // Spawn send and receive logic
            // Streams may or may not have reading/writing, so only start tasks accordingly
            var receiveTask = ValueTask.CompletedTask;
            var sendTask = ValueTask.CompletedTask;
            if (_stream.CanRead)
                receiveTask = DoReceiveAsync();
            if (_stream.CanWrite)
                sendTask = DoSendAsync();
            // Now wait for both to complete
            await receiveTask;
            await sendTask;
        catch (Exception ex)
            _log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(StartAsync)}.");
    private async ValueTask WaitForWritesClosedAsync()
        Debug.Assert(_stream != null);
            await _stream.WritesClosed;
        catch (Exception ex)
            // Send error to DoSend loop.
            _writeAbortException = ex;
    private async ValueTask DoReceiveAsync()
        Debug.Assert(_stream != null);
        Exception? error = null;
            var input = Input;
            while (true)
                var buffer = input.GetMemory(MinAllocBufferSize);
                var bytesReceived = await _stream.ReadAsync(buffer);
                if (bytesReceived == 0)
                    // Read completed.
                ValueTask<FlushResult> flushTask;
                if (_stream.ReadsClosed.IsCompletedSuccessfully)
                    // If the data returned from ReadAsync is the final chunk on the stream then
                    // flush data and end pipe together with CompleteAsync.
                    // Getting data and complete together is important for HTTP/3 when parsing headers.
                    // It is important that it knows that there is no body after the headers.
                    var completeTask = input.CompleteAsync();
                    if (completeTask.IsCompletedSuccessfully)
                        // Fast path. CompleteAsync completed immediately.
                        // Most implementations of ValueTask reset state in GetResult.
                        flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true));
                        flushTask = AwaitCompleteTaskAsync(completeTask);
                    flushTask = input.FlushAsync();
                var paused = !flushTask.IsCompleted;
                if (paused)
                    QuicLog.StreamPause(_log, this);
                var result = await flushTask;
                if (paused)
                    QuicLog.StreamResume(_log, this);
                if (result.IsCompleted || result.IsCanceled)
                    // Pipe consumer is shut down, do we stop writing
        catch (QuicException ex) when (ex.QuicError is QuicError.StreamAborted or QuicError.ConnectionAborted)
            // Abort from peer.
            _error = ex.ApplicationErrorCode; // Trust Quic to provide us a valid error code
            QuicLog.StreamAbortedRead(_log, this, ex.ApplicationErrorCode.GetValueOrDefault());
            // This could be ignored if _shutdownReason is already set.
            error = new ConnectionResetException(ex.Message, ex);
            _clientAbort = true;
        catch (QuicException ex) when (ex.QuicError is QuicError.ConnectionIdle)
            // Abort from timeout.
            QuicLog.StreamTimeoutRead(_log, this);
            // This could be ignored if _shutdownReason is already set.
            error = new ConnectionResetException(ex.Message, ex);
        catch (QuicException ex) when (ex.QuicError is QuicError.OperationAborted)
            // AbortRead has been called for the stream.
            error = new ConnectionAbortedException(ex.Message, ex);
        catch (Exception ex)
            // This is unexpected.
            error = ex;
            QuicLog.StreamError(_log, this, error);
            // If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
        async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTask)
            await completeTask;
            return new FlushResult(isCanceled: false, isCompleted: true);
    private Exception? ResolveCompleteReceiveException(Exception? error)
        return _shutdownReadReason ?? _shutdownReason ?? error;
    private void FireStreamClosed()
        // Guard against scheduling this multiple times
        lock (_shutdownLock)
            if (_streamClosed)
            _streamClosed = true;
        var onClosed = _onClosedRegistrations;
        if (onClosed != null)
            foreach (var closeAction in onClosed)
        if (_streamClosedTokenSource != null)
    private void CancelConnectionClosedToken()
        Debug.Assert(_streamClosedTokenSource != null);
        catch (Exception ex)
            _log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(CancelConnectionClosedToken)}.");
    private async ValueTask DoSendAsync()
        Debug.Assert(_stream != null);
        Exception? shutdownReason = null;
        Exception? unexpectedError = null;
        // A client can abort a stream after it has finished sending data. We need a way to get that notification
        // which is why we listen for a notification that the write-side of the stream is done.
        // An exception can be thrown from the stream on client abort which will be captured and then wake up the output read.
        var waitForWritesClosedTask = WaitForWritesClosedAsync();
            // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
            var output = Output;
            while (true)
                var result = await output.ReadAsync();
                if (result.IsCanceled)
                    // WaitForWritesCompleted provides immediate notification that write-side of stream has completed.
                    // If the stream or connection is aborted then exception will be available to rethrow.
                    if (_writeAbortException != null)
                var buffer = result.Buffer;
                var end = buffer.End;
                var isCompleted = result.IsCompleted;
                if (!buffer.IsEmpty)
                    if (buffer.IsSingleSegment)
                        // Fast path when the buffer is a single segment.
                        await _stream.WriteAsync(buffer.First, completeWrites: isCompleted);
                        // When then buffer has multiple segments then write them in a loop.
                        // We're not using a standard foreach here because we want to detect
                        // the final write and pass end stream flag with that write.
                        var enumerator = buffer.GetEnumerator();
                        var isLastSegment = !enumerator.MoveNext();
                        while (!isLastSegment)
                            var currentSegment = enumerator.Current;
                            isLastSegment = !enumerator.MoveNext();
                            await _stream.WriteAsync(currentSegment, completeWrites: isLastSegment && isCompleted);
                if (isCompleted)
                    // Once the stream pipe is closed, shutdown the stream.
        catch (QuicException ex) when (ex.QuicError is QuicError.StreamAborted or QuicError.ConnectionAborted)
            // Abort from peer.
            _error = ex.ApplicationErrorCode; // Trust Quic to provide us a valid error code
            QuicLog.StreamAbortedWrite(_log, this, ex.ApplicationErrorCode.GetValueOrDefault());
            // This could be ignored if _shutdownReason is already set.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
            _clientAbort = true;
        catch (QuicException ex) when (ex.QuicError is QuicError.ConnectionIdle)
            // Abort from timeout.
            QuicLog.StreamTimeoutWrite(_log, this);
            // This could be ignored if _shutdownReason is already set.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
        catch (QuicException ex) when (ex.QuicError is QuicError.OperationAborted)
            // AbortWrite has been called for the stream.
            // Possibily might also get here from connection closing.
            // System.Net.Quic exception handling not finalized.
            shutdownReason = new ConnectionResetException(ex.Message, ex);
        catch (Exception ex)
            shutdownReason = ex;
            unexpectedError = ex;
            QuicLog.StreamError(_log, this, unexpectedError);
            await waitForWritesClosedTask;
            // Complete the output after completing stream sends
            // Cancel any pending flushes so that the input loop is un-paused
    public override void Abort(ConnectionAbortedException abortReason)
        // Make local copy of reference to avoid possibility of race with stream being set to null in dispose.
        var stream = _stream;
        lock (_shutdownLock)
            // Abort called after dispose. Stream is set to null in dispose.
            if (stream == null)
            // This abort is called twice, make sure that doesn't happen.
            // Don't call _stream.Shutdown and _stream.Abort at the same time.
            if (_serverAborted)
            _serverAborted = true;
            _shutdownReason = abortReason;
        var resolvedErrorCode = _error ?? 0; // _error is validated on assignment
        QuicLog.StreamAbort(_log, this, resolvedErrorCode, abortReason.Message);
        if (stream.CanRead)
            stream.Abort(QuicAbortDirection.Read, resolvedErrorCode);
        if (stream.CanWrite)
            stream.Abort(QuicAbortDirection.Write, resolvedErrorCode);
        // Cancel ProcessSends loop after calling shutdown to ensure the correct _shutdownReason gets set.
    private void ShutdownWrite(Exception? shutdownReason)
        Debug.Assert(_stream != null);
            lock (_shutdownLock)
                _shutdownReason = _shutdownWriteReason ?? _shutdownReason ?? shutdownReason ?? SendGracefullyCompletedException;
                QuicLog.StreamShutdownWrite(_log, this, _shutdownReason.Message);
                // Only complete writes for a graceful shutdown.
                if (_shutdownReason == SendGracefullyCompletedException)
        catch (Exception ex)
            _log.LogWarning(ex, "Stream failed to gracefully shutdown.");
            // Ignore any errors from Shutdown() since we're tearing down the stream anyway.
    public override async ValueTask DisposeAsync()
        if (_stream == null)
        await _processingTask;
        await _stream.DisposeAsync();
        lock (_shutdownLock)
            // CanReuse must not be calculated while draining stream. It is possible for
            // an abort to be received while any methods are still running.
            // It is safe to calculate CanReuse after processing is completed.
            // Be conservative about what can be pooled.
            // Only pool bidirectional streams whose pipes have completed successfully and haven't been aborted.
            CanReuse = CanRead && CanWrite
                && _transportPipeReader.IsCompletedSuccessfully
                && _transportPipeWriter.IsCompletedSuccessfully
                && !_clientAbort
                && !_serverAborted
                && _shutdownReadReason == null
                && _shutdownWriteReason == null;
            if (!CanReuse)
            // QuicStream can't be reused. Don't hang onto it when QuicStreamContext it potentially cached.
            _stream = null!;
    public void Dispose()
        if (!_connection.TryReturnStream(this))
            // Dispose when one of:
            // - Stream is not bidirection
            // - Stream didn't complete gracefully
            // - Pool is full
    // Called when the stream is no longer reused.
    public void DisposeCore()