File: RemoteCallback.cs
Web Access
Project: src\src\Workspaces\Remote\Core\Microsoft.CodeAnalysis.Remote.Workspaces.csproj (Microsoft.CodeAnalysis.Remote.Workspaces)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.ServiceHub.Framework;
using StreamJsonRpc;
 
namespace Microsoft.CodeAnalysis.Remote;
 
/// <summary>
/// Wraps calls from a remote brokered service back to the client or to an in-proc brokered service.
/// The purpose of this type is to handle exceptions thrown by the underlying remoting infrastructure
/// in manner that's compatible with our exception handling policies.
/// </summary>
internal readonly struct RemoteCallback<T>
    where T : class
{
    private readonly T _callback;
 
    public RemoteCallback(T callback)
    {
        _callback = callback;
    }
 
    /// <summary>
    /// Use to perform a callback from ServiceHub process to an arbitrary brokered service hosted in the original process (usually devenv).
    /// </summary>
    public static async ValueTask InvokeServiceAsync(
        ServiceBrokerClient client,
        ServiceRpcDescriptor serviceDescriptor,
        Func<RemoteCallback<T>, CancellationToken, ValueTask> invocation,
        CancellationToken cancellationToken)
    {
        ServiceBrokerClient.Rental<T> rental;
        try
        {
            rental = await client.GetProxyAsync<T>(serviceDescriptor, cancellationToken).ConfigureAwait(false);
        }
        catch (ObjectDisposedException e)
        {
            // When a connection is dropped ServiceHub's ServiceManager disposes the brokered service, which in turn disposes the ServiceBrokerClient.
            cancellationToken.ThrowIfCancellationRequested();
            throw new OperationCanceledIgnoringCallerTokenException(e);
        }
 
        Contract.ThrowIfNull(rental.Proxy);
        var callback = new RemoteCallback<T>(rental.Proxy);
 
        await invocation(callback, cancellationToken).ConfigureAwait(false);
    }
 
    /// <summary>
    /// Invokes API on the callback object hosted in the original process (usually devenv) associated with the currently executing brokered service hosted in ServiceHub process.
    /// </summary>
    public async ValueTask InvokeAsync(Func<T, CancellationToken, ValueTask> invocation, CancellationToken cancellationToken)
    {
        try
        {
            await invocation(_callback, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
        {
            throw new OperationCanceledIgnoringCallerTokenException(exception);
        }
    }
 
    /// <summary>
    /// Invokes API on the callback object hosted in the original process (usually devenv) associated with the currently executing brokered service hosted in ServiceHub process.
    /// </summary>
    public async ValueTask<TResult> InvokeAsync<TResult>(Func<T, CancellationToken, ValueTask<TResult>> invocation, CancellationToken cancellationToken)
    {
        try
        {
            return await invocation(_callback, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
        {
            throw new OperationCanceledIgnoringCallerTokenException(exception);
        }
    }
 
    /// <summary>
    /// Invokes API on the callback object hosted in the original process (usually devenv) associated with the
    /// currently executing brokered service hosted in ServiceHub process. The API streams results back to the
    /// caller.
    /// </summary>
    /// <param name="invocation">A callback to asynchronously write data. The callback should always <see
    /// cref="PipeWriter.Complete"/> the <see cref="PipeWriter"/>.  If it does not then reading will hang</param>
    /// <param name="reader">A callback to asynchronously read data. The callback should not complete the <see
    /// cref="PipeReader"/>, but no harm will happen if it does.</param>
    /// <param name="cancellationToken">A cancellation token the operation will observe.</param>
    public async ValueTask InvokeAsync(
        Func<T, PipeWriter, CancellationToken, ValueTask> invocation,
        Func<PipeReader, CancellationToken, ValueTask> reader,
        CancellationToken cancellationToken)
    {
        try
        {
            cancellationToken.ThrowIfCancellationRequested();
            var pipe = new Pipe();
 
            // Kick off the work to do the writing to the pipe asynchronously.  It will start hot and will be able
            // to do work as the reading side attempts to pull in the data it is writing.
 
            var writeTask = WriteAsync(_callback, pipe.Writer);
            var readTask = ReadAsync(pipe.Reader);
 
            // Note: waiting on the write-task is not strictly necessary.  The read-task cannot complete unless it
            // the write-task completes (or it faults for some reason).  However, it's nice and clean to just not
            // use fire-and-forget here and avoids us having to consider things like async-tracking-tokens for
            // testing purposes.
            await Task.WhenAll(writeTask, readTask).ConfigureAwait(false);
            await readTask.ConfigureAwait(false);
        }
        catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
        {
            throw new OperationCanceledIgnoringCallerTokenException(exception);
        }
 
        return;
 
        async Task WriteAsync(T service, PipeWriter pipeWriter)
        {
            Exception? exception = null;
            try
            {
                // Intentionally yield this thread so that the caller can proceed concurrently and start reading.
                // This is not strictly necessary (as we know the writer will always call FlushAsync()), but it is nice
                // as it allows both to proceed concurrently on the initial writing/reading.
                await Task.Yield();
 
                await invocation(service, pipeWriter, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception ex) when ((exception = ex) == null)
            {
                throw ExceptionUtilities.Unreachable();
            }
            finally
            {
                // Absolutely do not Complete/CompleteAsync the writer here *unless* an exception occurred.  The
                // writer is passed to StreamJsonRPC which takes ownership of it.  The *inside* of that rpc is
                // responsible for Completing the writer *it* is passed, which will signal the completion of the
                // writer we have here.
                //
                // We *do* need to complete this writer in the event if an exception as that may have happened
                // *prior* to even issuing the rpc.  If we don't complete the writer we will hang.  If the exception
                // happened within the RPC the writer may already be completed, but it's fine for us to complete it
                // a second time.
                //
                // The reason is *not* fine for us to complete the writer in a non-exception event is that it legal
                // (and is the case in practice) that the code in StreamJsonRPC may still be using it (see
                // https://github.com/AArnott/Nerdbank.Streams/blob/dafeb5846702bc29e261c9ddf60f42feae01654c/src/Nerdbank.Streams/PipeExtensions.cs#L428)
                // where the writer may be advanced in an independent Task even once the rpc message has returned to
                // the caller (us). 
                //
                // NOTE: it is intentinonal that the try/catch pattern here does NOT match the one in ReadAsync.  There
                // are very different semantics around each.  The writer code passes ownership to StreamJsonRPC, while
                // the reader code does not.  As such, the reader code is responsible for completing the reader in all
                // cases, whereas the writer code only completes when faulting.
 
                // DO NOT REMOVE THIS NULL CHECK WITHOUT DEEP AND CAREFUL REVIEW.
                if (exception != null)
                    await pipeWriter.CompleteAsync(exception).ConfigureAwait(false);
            }
        }
 
        async Task ReadAsync(PipeReader pipeReader)
        {
            // NOTE: it is intentional that the try/catch pattern here does NOT match the one in WriteAsync.  There
            // are very different semantics around each.  The writer code passes ownership to StreamJsonRPC, while
            // the reader code does not.  As such, the reader code is responsible for completing the reader in all
            // cases, whereas the writer code only completes when faulting.
 
            Exception? exception = null;
            try
            {
                await reader(pipeReader, cancellationToken).ConfigureAwait(false);
                return;
            }
            catch (Exception ex) when ((exception = ex) == null)
            {
                throw ExceptionUtilities.Unreachable();
            }
            finally
            {
                // ensure we always complete the reader so the pipe can clean up all its resources. in the case of
                // an exception, attempt to complete the reader with that as well as that will tear down the writer
                // allowing it to stop writing and allowing the pipe to be cleaned up.
                await pipeReader.CompleteAsync(exception).ConfigureAwait(false);
            }
        }
    }
 
    // Remote calls can only throw 4 types of exceptions that correspond to
    //
    //   1) Connection issue (connection dropped for any reason)
    //   2) Serialization issue - bug in serialization of arguments (types are not serializable, etc.)
    //   3) Remote exception - an exception was thrown by the callee
    //   4) Cancelation
    //
    private static bool ReportUnexpectedException(Exception exception, CancellationToken cancellationToken)
    {
        if (exception is IOException)
        {
            // propagate intermittent exceptions without reporting telemetry:
            return false;
        }
 
        if (exception is OperationCanceledException)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                // Cancellation was requested and expected
                return false;
            }
 
            // Log unexpected state where a cancellation exception occurs without being requested. This will return
            // 'true' and caller will convert this to an acceptable cancellation token that won't cause a second
            // NFW.
            return FatalError.ReportAndCatch(exception);
        }
 
        // When a connection is dropped we can see ConnectionLostException even though
        // CancelLocallyInvokedMethodsWhenConnectionIsClosed is set. That's because there might be a delay between
        // the JsonRpc detecting the disconnect and the call attempting to send a message. Catch the
        // ConnectionLostException exception here and convert it to OperationCanceledException.
        if (exception is ConnectionLostException)
            return true;
 
        // Indicates bug on client side or in serialization, report NFW and propagate the exception.
        return FatalError.ReportAndPropagate(exception);
    }
}