File: System\IO\Pipelines\PipeAwaitable.cs
Web Access
Project: src\src\libraries\System.IO.Pipelines\src\System.IO.Pipelines.csproj (System.IO.Pipelines)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks.Sources;
 
namespace System.IO.Pipelines
{
    [DebuggerDisplay("CanceledState = {_awaitableState}, IsCompleted = {IsCompleted}")]
    internal struct PipeAwaitable
    {
        private AwaitableState _awaitableState;
        private Action<object?>? _completion;
        private object? _completionState;
        // It's rare to have to capture custom context here
        private SchedulingContext? _schedulingContext;
        private CancellationTokenRegistration _cancellationTokenRegistration;
 
#if (!NETSTANDARD2_0 && !NETFRAMEWORK)
        private CancellationToken CancellationToken => _cancellationTokenRegistration.Token;
#else
        private CancellationToken _cancellationToken;
        private CancellationToken CancellationToken => _cancellationToken;
#endif
 
        public PipeAwaitable(bool completed, bool useSynchronizationContext)
        {
            _awaitableState = (completed ? AwaitableState.Completed : AwaitableState.None) |
                              (useSynchronizationContext ? AwaitableState.UseSynchronizationContext : AwaitableState.None);
            _completion = null;
            _completionState = null;
            _cancellationTokenRegistration = default;
            _schedulingContext = null;
#if (NETSTANDARD2_0 || NETFRAMEWORK)
            _cancellationToken = CancellationToken.None;
#endif
        }
 
        public bool IsCompleted => (_awaitableState & (AwaitableState.Completed | AwaitableState.Canceled)) != 0;
 
        public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0;
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
        {
            // Don't register if already completed, we would immediately unregistered in ObserveCancellation
            if (cancellationToken.CanBeCanceled && !IsCompleted)
            {
#if DEBUG
                var previousAwaitableState = _awaitableState;
#endif
 
                _cancellationTokenRegistration = cancellationToken.UnsafeRegister(callback, state);
 
                // If we get back the default CancellationTokenRegistration then it means the
                // callback synchronously and we can throw inline. This is safe because we haven't changed
                // the state of the awaitable as yet.
                if (_cancellationTokenRegistration == default(CancellationTokenRegistration))
                {
#if DEBUG
                    Debug.Assert(previousAwaitableState == _awaitableState, "The awaitable state changed!");
#endif
 
                    cancellationToken.ThrowIfCancellationRequested();
                }
 
#if (NETSTANDARD2_0 || NETFRAMEWORK)
                _cancellationToken = cancellationToken;
#endif
            }
 
            _awaitableState |= AwaitableState.Running;
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Complete(out CompletionData completionData)
        {
            ExtractCompletion(out completionData);
 
            _awaitableState |= AwaitableState.Completed;
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private void ExtractCompletion(out CompletionData completionData)
        {
            Action<object?>? currentCompletion = _completion;
            object? currentState = _completionState;
            SchedulingContext? schedulingContext = _schedulingContext;
            ExecutionContext? executionContext = schedulingContext?.ExecutionContext;
            SynchronizationContext? synchronizationContext = schedulingContext?.SynchronizationContext;
 
            _completion = null;
            _completionState = null;
            _schedulingContext = null;
 
            completionData = currentCompletion != null ?
                new CompletionData(currentCompletion, currentState, executionContext, synchronizationContext) :
                default;
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetUncompleted()
        {
            Debug.Assert(_completion == null);
            Debug.Assert(_completionState == null);
            Debug.Assert(_schedulingContext == null);
 
            _awaitableState &= ~AwaitableState.Completed;
        }
 
        public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
        {
            completionData = default;
            doubleCompletion = _completion is not null;
 
            if (IsCompleted || doubleCompletion)
            {
                completionData = new CompletionData(continuation, state, _schedulingContext?.ExecutionContext, _schedulingContext?.SynchronizationContext);
                return;
            }
 
            _completion = continuation;
            _completionState = state;
 
            // Capture the SynchronizationContext if there's any and we're allowing capture (from pipe options)
            if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 &&
                (flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext? sc = SynchronizationContext.Current;
                if (sc != null && sc.GetType() != typeof(SynchronizationContext))
                {
                    _schedulingContext ??= new SchedulingContext();
                    _schedulingContext.SynchronizationContext = sc;
                }
            }
 
            // Capture the execution context
            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                _schedulingContext ??= new SchedulingContext();
                _schedulingContext.ExecutionContext = ExecutionContext.Capture();
            }
        }
 
        public void Cancel(out CompletionData completionData)
        {
            ExtractCompletion(out completionData);
 
            _awaitableState |= AwaitableState.Canceled;
        }
 
        public void CancellationTokenFired(out CompletionData completionData)
        {
            // We might be getting stale callbacks that we already unsubscribed from
            if (CancellationToken.IsCancellationRequested)
            {
                Cancel(out completionData);
            }
            else
            {
                completionData = default;
            }
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool ObserveCancellation()
        {
            bool isCanceled = (_awaitableState & AwaitableState.Canceled) == AwaitableState.Canceled;
 
            _awaitableState &= ~(AwaitableState.Canceled | AwaitableState.Running);
 
            return isCanceled;
        }
 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public CancellationTokenRegistration ReleaseCancellationTokenRegistration(out CancellationToken cancellationToken)
        {
            cancellationToken = CancellationToken;
            CancellationTokenRegistration cancellationTokenRegistration = _cancellationTokenRegistration;
 
#if (NETSTANDARD2_0 || NETFRAMEWORK)
            _cancellationToken = default;
#endif
            _cancellationTokenRegistration = default;
 
            return cancellationTokenRegistration;
        }
 
        [Flags]
        private enum AwaitableState
        {
            None = 0,
            // Marks that if logical operation (backpressure/waiting for data) is completed. Set in Complete reset in Reset
            Completed = 1,
            // Marks that operation is running. Set in *Async reset in  ObserveCancellation (GetResult)
            Running = 2,
            // Marks that operation is canceled. Set in Cancel reset in ObserveCancellation (GetResult)
            Canceled = 4,
            UseSynchronizationContext = 8
        }
 
        private sealed class SchedulingContext
        {
            public SynchronizationContext? SynchronizationContext { get; set; }
            public ExecutionContext? ExecutionContext { get; set; }
        }
    }
}