|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace System.Threading.Channels
{
internal abstract class AsyncOperation
{
/// <summary>Sentinel object used in a field to indicate the operation is available for use.</summary>
protected static readonly Action<object?> s_availableSentinel = AvailableSentinel; // named method to help with debugging
private static void AvailableSentinel(object? s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(AvailableSentinel)} invoked with {s}");
/// <summary>Sentinel object used in a field to indicate the operation has completed</summary>
protected static readonly Action<object?> s_completedSentinel = CompletedSentinel; // named method to help with debugging
private static void CompletedSentinel(object? s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(CompletedSentinel)} invoked with {s}");
/// <summary>Throws an exception indicating that the operation's result was accessed before the operation completed.</summary>
protected static void ThrowIncompleteOperationException() =>
throw new InvalidOperationException(SR.InvalidOperation_IncompleteAsyncOperation);
/// <summary>Throws an exception indicating that multiple continuations can't be set for the same operation.</summary>
protected static void ThrowMultipleContinuations() =>
throw new InvalidOperationException(SR.InvalidOperation_MultipleContinuations);
/// <summary>Throws an exception indicating that the operation was used after it was supposed to be used.</summary>
protected static void ThrowIncorrectCurrentIdException() =>
throw new InvalidOperationException(SR.InvalidOperation_IncorrectToken);
}
/// <summary>The representation of an asynchronous operation that has a result value.</summary>
/// <typeparam name="TResult">Specifies the type of the result. May be <see cref="VoidResult"/>.</typeparam>
internal partial class AsyncOperation<TResult> : AsyncOperation, IValueTaskSource, IValueTaskSource<TResult>
{
/// <summary>Registration with a provided cancellation token.</summary>
private readonly CancellationTokenRegistration _registration;
/// <summary>true if this object is pooled and reused; otherwise, false.</summary>
/// <remarks>
/// If the operation is cancelable, then it can't be pooled. And if it's poolable, there must never be race conditions to complete it,
/// which is the main reason poolable objects can't be cancelable, as then cancellation could fire, the object could get reused,
/// and then we may end up trying to complete an object that's used by someone else.
/// </remarks>
private readonly bool _pooled;
/// <summary>Whether continuations should be forced to run asynchronously.</summary>
private readonly bool _runContinuationsAsynchronously;
/// <summary>Only relevant to cancelable operations; 0 if the operation hasn't had completion reserved, 1 if it has.</summary>
private volatile int _completionReserved;
/// <summary>The result of the operation.</summary>
private TResult? _result;
/// <summary>Any error that occurred during the operation.</summary>
private ExceptionDispatchInfo? _error;
/// <summary>The continuation callback.</summary>
/// <remarks>
/// This may be the completion sentinel if the operation has already completed.
/// This may be the available sentinel if the operation is being pooled and is available for use.
/// This may be null if the operation is pending.
/// This may be another callback if the operation has had a callback hooked up with OnCompleted.
/// </remarks>
private Action<object?>? _continuation;
/// <summary>State object to be passed to <see cref="_continuation"/>.</summary>
private object? _continuationState;
/// <summary>Scheduling context (a <see cref="SynchronizationContext"/> or <see cref="TaskScheduler"/>) to which to queue the continuation. May be null.</summary>
private object? _schedulingContext;
/// <summary>Execution context to use when invoking <see cref="_continuation"/>. May be null.</summary>
private ExecutionContext? _executionContext;
/// <summary>The token value associated with the current operation.</summary>
/// <remarks>
/// IValueTaskSource operations on this instance are only valid if the provided token matches this value,
/// which is incremented once GetResult is called to avoid multiple awaits on the same instance.
/// </remarks>
private short _currentId;
/// <summary>Initializes the interactor.</summary>
/// <param name="runContinuationsAsynchronously">true if continuations should be forced to run asynchronously; otherwise, false.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the operation.</param>
/// <param name="pooled">Whether this instance is pooled and reused.</param>
public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default, bool pooled = false)
{
_continuation = pooled ? s_availableSentinel : null;
_pooled = pooled;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
if (cancellationToken.CanBeCanceled)
{
Debug.Assert(!_pooled, "Cancelable operations can't be pooled");
CancellationToken = cancellationToken;
_registration = UnsafeRegister(cancellationToken, static s =>
{
var thisRef = (AsyncOperation<TResult>)s!;
thisRef.TrySetCanceled(thisRef.CancellationToken);
}, this);
}
}
/// <summary>Gets or sets the next operation in the linked list of operations.</summary>
public AsyncOperation<TResult>? Next { get; set; }
/// <summary>Gets the cancellation token associated with this operation.</summary>
public CancellationToken CancellationToken { get; }
/// <summary>Gets a <see cref="ValueTask"/> backed by this instance and its current token.</summary>
public ValueTask ValueTask => new ValueTask(this, _currentId);
/// <summary>Gets a <see cref="ValueTask{TResult}"/> backed by this instance and its current token.</summary>
public ValueTask<TResult> ValueTaskOfT => new ValueTask<TResult>(this, _currentId);
/// <summary>Gets the current status of the operation.</summary>
/// <param name="token">The token that must match <see cref="_currentId"/>.</param>
public ValueTaskSourceStatus GetStatus(short token)
{
if (_currentId != token)
{
ThrowIncorrectCurrentIdException();
}
return
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
}
/// <summary>Gets whether the operation has completed.</summary>
internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel);
/// <summary>Gets the result of the operation.</summary>
/// <param name="token">The token that must match <see cref="_currentId"/>.</param>
public TResult GetResult(short token)
{
if (_currentId != token)
{
ThrowIncorrectCurrentIdException();
}
if (!IsCompleted)
{
ThrowIncompleteOperationException();
}
ExceptionDispatchInfo? error = _error;
TResult? result = _result;
_currentId++;
if (_pooled)
{
Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data
}
error?.Throw();
return result!;
}
/// <summary>Gets the result of the operation.</summary>
/// <param name="token">The token that must match <see cref="_currentId"/>.</param>
void IValueTaskSource.GetResult(short token)
{
if (_currentId != token)
{
ThrowIncorrectCurrentIdException();
}
if (!IsCompleted)
{
ThrowIncompleteOperationException();
}
ExceptionDispatchInfo? error = _error;
_currentId++;
if (_pooled)
{
Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data
}
error?.Throw();
}
/// <summary>Attempts to take ownership of the pooled instance.</summary>
/// <returns>true if the instance is now owned by the caller, in which case its state has been reset; otherwise, false.</returns>
public bool TryOwnAndReset()
{
Debug.Assert(_pooled, "Should only be used for pooled objects");
if (ReferenceEquals(Interlocked.CompareExchange(ref _continuation, null, s_availableSentinel), s_availableSentinel))
{
_continuationState = null;
_result = default;
_error = null;
_schedulingContext = null;
_executionContext = null;
return true;
}
return false;
}
/// <summary>Hooks up a continuation callback for when the operation has completed.</summary>
/// <param name="continuation">The callback.</param>
/// <param name="state">The state to pass to the callback.</param>
/// <param name="token">The current token that must match <see cref="_currentId"/>.</param>
/// <param name="flags">Flags that influence the behavior of the callback.</param>
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (_currentId != token)
{
ThrowIncorrectCurrentIdException();
}
// We need to store the state before the CompareExchange, so that if it completes immediately
// after the CompareExchange, it'll find the state already stored. If someone misuses this
// and schedules multiple continuations erroneously, we could end up using the wrong state.
// Make a best-effort attempt to catch such misuse.
if (_continuationState != null)
{
ThrowMultipleContinuations();
}
_continuationState = state;
// Capture the execution context if necessary.
Debug.Assert(_executionContext == null);
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
_executionContext = ExecutionContext.Capture();
}
// Capture the scheduling context if necessary.
Debug.Assert(_schedulingContext == null);
SynchronizationContext? sc = null;
TaskScheduler? ts = null;
if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
{
sc = SynchronizationContext.Current;
if (sc != null && sc.GetType() != typeof(SynchronizationContext))
{
_schedulingContext = sc;
}
else
{
sc = null;
ts = TaskScheduler.Current;
if (ts != TaskScheduler.Default)
{
_schedulingContext = ts;
}
}
}
// Try to set the provided continuation into _continuation. If this succeeds, that means the operation
// has not yet completed, and the completer will be responsible for invoking the callback. If this fails,
// that means the operation has already completed, and we must invoke the callback, but because we're still
// inside the awaiter's OnCompleted method and we want to avoid possible stack dives, we must invoke
// the continuation asynchronously rather than synchronously.
Action<object?>? prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prevContinuation != null)
{
// If the set failed because there's already a delegate in _continuation, but that delegate is
// something other than s_completedSentinel, something went wrong, which should only happen if
// the instance was erroneously used, likely to hook up multiple continuations.
Debug.Assert(IsCompleted, $"Expected IsCompleted");
if (!ReferenceEquals(prevContinuation, s_completedSentinel))
{
Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel.");
ThrowMultipleContinuations();
}
// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.
if (_schedulingContext == null)
{
if (_executionContext == null)
{
UnsafeQueueUserWorkItem(continuation, state);
}
else
{
QueueUserWorkItem(continuation, state);
}
}
else if (sc != null)
{
sc.Post(static s =>
{
var t = (KeyValuePair<Action<object?>, object?>)s!;
t.Key(t.Value);
}, new KeyValuePair<Action<object?>, object?>(continuation, state));
}
else
{
Debug.Assert(ts != null);
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
}
}
}
/// <summary>Unregisters from cancellation and returns whether cancellation already started.</summary>
/// <returns>
/// true if either the instance wasn't cancelable or cancellation successfully unregistered without cancellation having started.
/// false if cancellation successfully unregistered after cancellation was initiated.
/// </returns>
/// <remarks>
/// This is important for two reasons:
/// 1. To avoid leaking a registration into a token, so it must be done prior to completing the operation.
/// 2. To avoid having to worry about concurrent completion; once invoked, the caller can be guaranteed
/// that no one else will try to complete the operation (assuming the caller is properly constructed
/// and themselves guarantees only a single completer other than through cancellation).
/// </remarks>
public bool UnregisterCancellation()
{
if (CancellationToken.CanBeCanceled)
{
_registration.Dispose(); // Dispose rather than Unregister is important to know work has quiesced
return _completionReserved == 0;
}
Debug.Assert(_registration == default);
return true;
}
/// <summary>Completes the operation with a success state and the specified result.</summary>
/// <param name="item">The result value.</param>
/// <returns>true if the operation could be successfully transitioned to a completed state; false if it was already completed.</returns>
public bool TrySetResult(TResult item)
{
UnregisterCancellation();
if (TryReserveCompletionIfCancelable())
{
_result = item;
SignalCompletion();
return true;
}
return false;
}
/// <summary>Completes the operation with a failed state and the specified error.</summary>
/// <param name="exception">The error.</param>
/// <returns>true if the operation could be successfully transitioned to a completed state; false if it was already completed.</returns>
public bool TrySetException(Exception exception)
{
UnregisterCancellation();
if (TryReserveCompletionIfCancelable())
{
_error = ExceptionDispatchInfo.Capture(exception);
SignalCompletion();
return true;
}
return false;
}
/// <summary>Completes the operation with a failed state and a cancellation error.</summary>
/// <param name="cancellationToken">The cancellation token that caused the cancellation.</param>
/// <returns>true if the operation could be successfully transitioned to a completed state; false if it was already completed.</returns>
public bool TrySetCanceled(CancellationToken cancellationToken = default)
{
if (TryReserveCompletionIfCancelable())
{
_error = ExceptionDispatchInfo.Capture(new OperationCanceledException(cancellationToken));
SignalCompletion();
return true;
}
return false;
}
/// <summary>Attempts to reserve this instance for completion.</summary>
/// <remarks>
/// This will always return true for non-cancelable objects, as they only ever have a single owner
/// responsible for completion. For cancelable operations, this will attempt to atomically transition
/// from Initialized to CompletionReserved.
/// </remarks>
private bool TryReserveCompletionIfCancelable() =>
!CancellationToken.CanBeCanceled ||
Interlocked.CompareExchange(ref _completionReserved, 1, 0) == 0;
/// <summary>Signals to a registered continuation that the operation has now completed.</summary>
private void SignalCompletion()
{
if (_continuation != null || Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) != null)
{
Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel.");
Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel.");
if (_schedulingContext == null)
{
// There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously)
{
UnsafeQueueSetCompletionAndInvokeContinuation();
return;
}
}
else if (_schedulingContext is SynchronizationContext sc)
{
// There's a captured synchronization context. If we're forced to run continuations asynchronously,
// or if there's a current synchronization context that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current)
{
sc.Post(static s => ((AsyncOperation<TResult>)s!).SetCompletionAndInvokeContinuation(), this);
return;
}
}
else
{
// There's a captured TaskScheduler. If we're forced to run continuations asynchronously,
// or if there's a current scheduler that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
TaskScheduler ts = (TaskScheduler)_schedulingContext;
Debug.Assert(ts != null, "Expected a TaskScheduler");
if (_runContinuationsAsynchronously || ts != TaskScheduler.Current)
{
Task.Factory.StartNew(static s => ((AsyncOperation<TResult>)s!).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
return;
}
}
// Invoke the continuation synchronously.
SetCompletionAndInvokeContinuation();
}
}
private void SetCompletionAndInvokeContinuation()
{
if (_executionContext == null)
{
Action<object?> c = _continuation!;
_continuation = s_completedSentinel;
c(_continuationState);
}
else
{
ExecutionContext.Run(_executionContext, static s =>
{
var thisRef = (AsyncOperation<TResult>)s!;
Action<object?> c = thisRef._continuation!;
thisRef._continuation = s_completedSentinel;
c(thisRef._continuationState);
}, this);
}
}
}
/// <summary>The representation of an asynchronous operation that has a result value and carries additional data with it.</summary>
/// <typeparam name="TData">Specifies the type of data being written.</typeparam>
internal sealed class VoidAsyncOperationWithData<TData> : AsyncOperation<VoidResult>
{
/// <summary>Initializes the interactor.</summary>
/// <param name="runContinuationsAsynchronously">true if continuations should be forced to run asynchronously; otherwise, false.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the operation.</param>
/// <param name="pooled">Whether this instance is pooled and reused.</param>
public VoidAsyncOperationWithData(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default, bool pooled = false) :
base(runContinuationsAsynchronously, cancellationToken, pooled)
{
}
/// <summary>The item being written.</summary>
public TData? Item { get; set; }
}
}
|