File: Internal\Infrastructure\PipeWriterHelpers\TimingPipeFlusher.cs
Web Access
Project: src\src\Servers\Kestrel\Core\src\Microsoft.AspNetCore.Server.Kestrel.Core.csproj (Microsoft.AspNetCore.Server.Kestrel.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
#nullable enable
 
using System.IO.Pipelines;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.Extensions.Logging;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
 
/// <summary>
/// This wraps PipeWriter.FlushAsync() in a way that allows multiple awaiters making it safe to call from publicly
/// exposed Stream implementations while also tracking response data rate.
/// </summary>
internal sealed class TimingPipeFlusher
{
    private PipeWriter _writer = default!;
    private readonly ITimeoutControl? _timeoutControl;
    private readonly KestrelTrace _log;
 
    public TimingPipeFlusher(
        ITimeoutControl? timeoutControl,
        KestrelTrace log)
    {
        _timeoutControl = timeoutControl;
        _log = log;
    }
 
    public void Initialize(PipeWriter output)
    {
        _writer = output;
    }
 
    public ValueTask<FlushResult> FlushAsync()
    {
        return FlushAsync(outputAborter: null, cancellationToken: default);
    }
 
    public ValueTask<FlushResult> FlushAsync(IHttpOutputAborter? outputAborter, CancellationToken cancellationToken)
    {
        return FlushAsync(minRate: null, count: 0, outputAborter: outputAborter, cancellationToken: cancellationToken);
    }
 
    public ValueTask<FlushResult> FlushAsync(MinDataRate? minRate, long count)
    {
        return FlushAsync(minRate, count, outputAborter: null, cancellationToken: default);
    }
 
    public ValueTask<FlushResult> FlushAsync(MinDataRate? minRate, long count, IHttpOutputAborter? outputAborter, CancellationToken cancellationToken)
    {
        if (minRate is object)
        {
            // Call BytesWrittenToBuffer before FlushAsync() to make testing easier, otherwise the Flush can cause test code to run before the timeout
            // control updates and if the test checks for a timeout it can fail
            _timeoutControl!.BytesWrittenToBuffer(minRate, count);
        }
 
        var pipeFlushTask = _writer.FlushAsync(cancellationToken);
 
        if (pipeFlushTask.IsCompletedSuccessfully)
        {
            var flushResult = pipeFlushTask.Result;
 
            if (flushResult.IsCompleted && outputAborter is object)
            {
                outputAborter.OnInputOrOutputCompleted();
            }
 
            return new ValueTask<FlushResult>(flushResult);
        }
 
        return TimeFlushAsyncAwaited(pipeFlushTask, minRate, outputAborter, cancellationToken);
    }
 
    private async ValueTask<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pipeFlushTask, MinDataRate? minRate, IHttpOutputAborter? outputAborter, CancellationToken cancellationToken)
    {
        if (minRate is object)
        {
            _timeoutControl!.StartTimingWrite();
        }
 
        try
        {
            var flushResult = await pipeFlushTask;
 
            if (flushResult.IsCompleted && outputAborter is object)
            {
                outputAborter.OnInputOrOutputCompleted();
            }
        }
        catch (OperationCanceledException ex) when (outputAborter is object)
        {
            outputAborter.Abort(new ConnectionAbortedException(CoreStrings.ConnectionOrStreamAbortedByCancellationToken, ex), ConnectionEndReason.WriteCanceled);
        }
        catch (Exception ex)
        {
            // A canceled token is the only reason flush should ever throw.
            _log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(FlushAsync)}.");
        }
        finally
        {
            if (minRate is object)
            {
                _timeoutControl!.StopTimingWrite();
            }
 
            cancellationToken.ThrowIfCancellationRequested();
        }
 
        return default;
    }
}