|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using Microsoft.ML.EntryPoints;
namespace Microsoft.ML.Runtime;
/// <summary>
/// Instances of this class are used to set up a bundle of named delegates. These
/// delegates are registered through <see cref="Register{TRet}"/> and its overloads.
/// Once all registrations are done, <see cref="Publish"/> is called and a message
/// of type <see cref="Bundle"/> is sent through the input channel
/// provider. The intended use case is that any information surfaced through these
/// delegates will be published in some fashion, with the target scenario being
/// that the library will publish some sort of restful API.
/// </summary>
[BestFriend]
internal sealed class ServerChannel : ServerChannel.IPendingBundleNotification, IDisposable
{
// See ServerChannel.md for a more elaborate discussion of high level usage and design.
private readonly IChannelProvider _chp;
private readonly string _identifier;
// This holds the running collection of named delegates, if any. The dictionary itself
// is lazily initialized only when a listener
private Dictionary<string, Delegate> _toPublish;
private Action<Bundle> _onPublish;
private Bundle _published;
private bool _disposed;
/// <summary>
/// Returns either this object, or <c>null</c> if there are no listeners on this server
/// channel. This can be used in conjunction with the <c>?.</c> operator to have more
/// performant though more robust calls to <see cref="Register{TRet}"/> and
/// <see cref="Publish"/>.
/// </summary>
private ServerChannel ThisIfActiveOrNull => _toPublish == null ? null : this;
private ServerChannel(IChannelProvider provider, string idenfier)
{
Contracts.AssertValue(provider);
_chp = provider;
_chp.AssertNonWhiteSpace(idenfier);
_identifier = idenfier;
}
/// <summary>
/// Starts a new server channel.
/// </summary>
/// <param name="provider">The channel provider, on which to send
/// the notification that a server is being constructed</param>
/// <param name="identifier">A semi-unique identifier for this
/// "bundle" that is being constructed</param>
/// <returns>The constructed server channel, or <c>null</c> if there
/// was no listeners for server channels registered on <paramref name="provider"/></returns>
public static ServerChannel Start(IChannelProvider provider, string identifier)
{
Contracts.CheckValue(provider, nameof(provider));
provider.CheckNonWhiteSpace(identifier, nameof(identifier));
using (var pipe = provider.StartPipe<IPendingBundleNotification>("Server"))
{
var sc = new ServerChannel(provider, identifier);
pipe.Send(sc);
return sc.ThisIfActiveOrNull;
}
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
_published?.Done();
}
}
private void RegisterCore(string name, Delegate func)
{
_chp.CheckNonEmpty(name, nameof(name));
_chp.CheckValue(func, nameof(func));
_chp.Check(_published == null, "Cannot expose more interfaces once a server channel has been published");
_chp.AssertValue(_toPublish);
_toPublish.Add(name, func);
}
public void Register<TRet>(string name, Func<TRet> func)
{
if (_toPublish != null)
RegisterCore(name, func);
}
public void Register<T1, TRet>(string name, Func<T1, TRet> func)
{
if (_toPublish != null)
RegisterCore(name, func);
}
public void Register<T1, T2, TRet>(string name, Func<T1, T2, TRet> func)
{
if (_toPublish != null)
RegisterCore(name, func);
}
public void Register<T1, T2, T3, TRet>(string name, Func<T1, T2, T3, TRet> func)
{
if (_toPublish != null)
RegisterCore(name, func);
}
/// <summary>
/// Finalizes all registrations of delegates, and pipes the bundle of objects
/// in a <see cref="Bundle"/> up through the pipe to be consumed by any
/// listeners.
/// </summary>
public void Publish()
{
_chp.Assert((_toPublish == null) == (_onPublish == null));
if (_toPublish == null)
return;
_chp.Check(_published == null, "Cannot republish once a server channel has been published");
_published = new Bundle(this);
_onPublish(_published);
}
public void Acknowledge(Action<Bundle> toDo)
{
_chp.CheckValue(toDo, nameof(toDo));
_chp.Assert((_onPublish == null) == (_toPublish == null));
if (_toPublish == null)
_toPublish = new Dictionary<string, Delegate>();
_onPublish += toDo;
_chp.AssertValue(_onPublish);
}
/// <summary>
/// Entry point factory for creating <see cref="IServer"/> instances.
/// </summary>
[TlcModule.ComponentKind("Server")]
public interface IServerFactory : IComponentFactory<IChannel, IServer>
{
new IServer CreateComponent(IHostEnvironment env, IChannel ch);
}
/// <summary>
/// Classes that want to publish the bundles from server channels in some fashion should implement
/// this interface. The intended simple use case is that this will be some form of in-process web
/// server, and then when disposed, they should stop themselves.
///
/// Note that the primary communication with the server from the client code's perspective is not
/// through method calls on this interface, but rather communication through an
/// <see cref="IPipe{IPendingBundleNotification}"/> that the server will listen to throughout its
/// lifetime.
/// </summary>
public interface IServer : IDisposable
{
/// <summary>
/// This should return the base address where the server is. If this server is not actually
/// serving content at any URL, this property should be null.
/// </summary>
Uri BaseAddress { get; }
}
/// <summary>
/// Creates what might be considered a good "default" server factory, if possible,
/// or <c>null</c> if no good default was possible. A <c>null</c> value could be returned,
/// for example, if a user opted to remove all implementations of <see cref="IServer"/> and
/// the associated <see cref="IServerFactory"/> for security reasons.
/// </summary>
public static IServerFactory CreateDefaultServerFactoryOrNull(IHostEnvironment env)
{
Contracts.CheckValue(env, nameof(env));
// REVIEW: There should be a better way. There currently isn't,
// but there should be. This is pretty horrifying, but it is preferable to
// the alternative of having core components depend on an actual server
// implementation, since we want those to be removable because of security
// concerns in certain environments (since not everyone will be wild about
// web servers popping up everywhere).
ComponentCatalog.ComponentInfo component;
if (!env.ComponentCatalog.TryFindComponent(typeof(IServerFactory), "mini", out component))
return null;
IServerFactory factory = (IServerFactory)Activator.CreateInstance(component.ArgumentType);
var field = factory.GetType().GetField("Port");
if (field?.FieldType != typeof(int))
return null;
field.SetValue(factory, 12345);
return factory;
}
/// <summary>
/// When a <see cref="ServerChannel"/> is created, the creation method will send an implementation
/// is a notification sent through an <see cref="IPipe{IPendingBundleNotification}"/>, to indicate that
/// a <see cref="Bundle"/> may be pending soon. Listeners that want to receive the bundle to
/// expose it, for example, a web service, should register this interest by passing in an action to be called.
/// If no listener registers interest, the server channel that sent the notification will act
/// differently by, say, acting as a no-op w.r.t. client calls to it.
/// </summary>
public interface IPendingBundleNotification
{
/// <summary>
/// Any publisher of the named delegates will call this method, upon receiving an instance
/// of this object through the pipe. This method serves two purposes: firstly it detects
/// whether anyone is even interested in publishing anything at all, so that we can just
/// ignore any input delegates in the case where no one is listening (which, we must expect,
/// is the majority of scenarios). The second is that it provides an action to call, once
/// all publishing is complete, and <see cref="Publish"/> has been called by the client code.
/// </summary>
/// <param name="toDo">The callback to perform when all named delegates have been registered,
/// and <see cref="Publish"/> is called.</param>
void Acknowledge(Action<Bundle> toDo);
}
/// <summary>
/// The final bundle of published named delegates that a listener can serve.
/// </summary>
public sealed class Bundle
{
/// <summary>
/// This contains a name to delegate mappings. The delegates contained herein are gauranteed to be
/// some variety of <see cref="Func{TResult}"/>, <see cref="Func{T1, TResult}"/>,
/// <see cref="Func{T1, T2, TResult}"/>, etc.
/// </summary>
public readonly IReadOnlyDictionary<string, Delegate> NameToFuncs;
/// <summary>
/// This should be a more-or-less unique identifier for the type of API this bundle is producing.
/// Its intended use is that it will form part of the URL for the RESTful API, so to the extent that
/// it contains multiple tokens they must be slash delimited.
/// </summary>
public readonly string Identifier;
internal Action Done;
internal Bundle(ServerChannel sch)
{
Contracts.AssertValue(sch);
NameToFuncs = sch._toPublish;
Identifier = sch._identifier;
}
public void AddDoneAction(Action onDone)
{
Done += onDone;
}
}
}
[BestFriend]
internal static class ServerChannelUtilities
{
/// <summary>
/// Convenience method for <see cref="ServerChannel.Start"/> that looks more idiomatic to typical
/// channel creation methods on <see cref="IChannelProvider"/>.
/// </summary>
/// <param name="provider">The channel provider.</param>
/// <param name="identifier">This is an identifier of the "type" of bundle that is being published,
/// and should form a path with forward-slash '/' delimiters.</param>
/// <returns>The newly created server channel, or <c>null</c> if there was no listener for
/// server channels on <paramref name="provider"/>.</returns>
public static ServerChannel StartServerChannel(this IChannelProvider provider, string identifier)
{
Contracts.CheckValue(provider, nameof(provider));
Contracts.CheckNonWhiteSpace(identifier, nameof(identifier));
return ServerChannel.Start(provider, identifier);
}
}
|