File: Core\OutputProducer.cs
Web Access
Project: src\src\Servers\IIS\IIS\src\Microsoft.AspNetCore.Server.IIS.csproj (Microsoft.AspNetCore.Server.IIS)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
 
namespace Microsoft.AspNetCore.Server.IIS.Core;
 
internal sealed class OutputProducer
{
    // This locks access to _completed and _aborted.
    private readonly object _contextLock = new object();
    private bool _completed;
    private volatile bool _aborted;
 
    private readonly Pipe _pipe;
 
    // https://github.com/dotnet/corefxlab/issues/1334
    // https://github.com/dotnet/aspnetcore/issues/8843
    // Pipelines don't support multiple awaiters on flush. This is temporary until it does.
    // _lastFlushTask field should only be get or set under the _flushLock.
    private readonly object _flushLock = new object();
    private Task _lastFlushTask = Task.CompletedTask;
 
    public OutputProducer(Pipe pipe)
    {
        _pipe = pipe;
    }
 
    public PipeReader Reader => _pipe.Reader;
 
    public bool Aborted => _aborted;
 
    public Task FlushAsync(CancellationToken cancellationToken)
    {
        _pipe.Reader.CancelPendingRead();
        // Await backpressure
        return FlushAsync(_pipe.Writer, cancellationToken);
    }
 
    public void Complete()
    {
        lock (_contextLock)
        {
            if (_completed || _aborted)
            {
                return;
            }
 
            _completed = true;
            _pipe.Writer.Complete();
        }
    }
 
    public void Abort()
    {
        lock (_contextLock)
        {
            if (_completed || _aborted)
            {
                return;
            }
 
            _aborted = true;
 
            _pipe.Reader.CancelPendingRead();
            _pipe.Writer.Complete();
        }
    }
 
    public Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
    {
        lock (_contextLock)
        {
            if (_completed || _aborted)
            {
                return Task.CompletedTask;
            }
 
            _pipe.Writer.Write(buffer.Span);
        }
 
        return FlushAsync(_pipe.Writer, cancellationToken);
    }
 
    private Task FlushAsync(PipeWriter pipeWriter, CancellationToken cancellationToken)
    {
        lock (_flushLock)
        {
            _lastFlushTask = _lastFlushTask.IsCompleted ?
                FlushNowAsync(pipeWriter, cancellationToken) :
                AwaitLastFlushAndThenFlushAsync(_lastFlushTask, pipeWriter, cancellationToken);
 
            return _lastFlushTask;
        }
    }
 
    private Task FlushNowAsync(PipeWriter pipeWriter, CancellationToken cancellationToken)
    {
        var awaitable = pipeWriter.FlushAsync(cancellationToken);
        return awaitable.IsCompleted ? Task.CompletedTask : FlushNowAsyncAwaited(awaitable, cancellationToken);
    }
 
    private async Task FlushNowAsyncAwaited(ValueTask<FlushResult> awaitable, CancellationToken cancellationToken)
    {
        try
        {
            await awaitable;
            cancellationToken.ThrowIfCancellationRequested();
        }
        catch (OperationCanceledException)
        {
            Abort();
        }
        catch
        {
            // A canceled token is the only reason flush should ever throw.
            Debug.Assert(false);
        }
    }
 
    private async Task AwaitLastFlushAndThenFlushAsync(Task lastFlushTask, PipeWriter pipeWriter, CancellationToken cancellationToken)
    {
        await lastFlushTask;
        await FlushNowAsync(pipeWriter, cancellationToken);
    }
}