|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Collections;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Text;
using Aspire.Dashboard.Utils;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Dcp.Process;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Aspire.Hosting.Dcp;
internal sealed class DcpHostService : IHostedLifecycleService, IAsyncDisposable
{
private const int LoggingSocketConnectionBacklog = 3;
private readonly ApplicationExecutor _appExecutor;
private readonly DistributedApplicationModel _applicationModel;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly DcpOptions _dcpOptions;
private readonly DistributedApplicationExecutionContext _executionContext;
private readonly IDcpDependencyCheckService _dependencyCheckService;
private readonly Locations _locations;
private readonly CancellationTokenSource _shutdownCts = new();
private Task? _logProcessorTask;
private IAsyncDisposable? _dcpRunDisposable;
// These environment variables should never be inherited by DCP from app host.
private static readonly string[] s_doNotInheritEnvironmentVars =
{
"ASPNETCORE_URLS",
"DOTNET_LAUNCH_PROFILE",
"ASPNETCORE_ENVIRONMENT",
"DOTNET_ENVIRONMENT"
};
public DcpHostService(
ILoggerFactory loggerFactory,
IOptions<DcpOptions> dcpOptions,
DistributedApplicationExecutionContext executionContext,
ApplicationExecutor appExecutor,
IDcpDependencyCheckService dependencyCheckService,
Locations locations,
DistributedApplicationModel applicationModel)
{
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<DcpHostService>();
_dcpOptions = dcpOptions.Value;
_executionContext = executionContext;
_appExecutor = appExecutor;
_dependencyCheckService = dependencyCheckService;
_locations = locations;
_applicationModel = applicationModel;
}
private bool IsSupported => !_executionContext.IsPublishMode;
public async Task StartAsync(CancellationToken cancellationToken = default)
{
if (!IsSupported)
{
return;
}
// Ensure DCP is installed and has all required dependencies
var dcpInfo = await _dependencyCheckService.GetDcpInfoAsync(cancellationToken).ConfigureAwait(false);
EnsureDcpContainerRuntime(dcpInfo);
EnsureDcpHostRunning();
await _appExecutor.RunApplicationAsync(cancellationToken).ConfigureAwait(false);
}
private void EnsureDcpContainerRuntime(DcpInfo? dcpInfo)
{
if (dcpInfo is null)
{
return;
}
// If we don't have any resources that need a container then we
// don't need to check for a healthy container runtime.
if (!_applicationModel.Resources.Any(c => c.IsContainer()))
{
return;
}
AspireEventSource.Instance.ContainerRuntimeHealthCheckStart();
try
{
DcpDependencyCheck.CheckDcpInfoAndLogErrors(_logger, _dcpOptions, dcpInfo);
}
finally
{
AspireEventSource.Instance?.ContainerRuntimeHealthCheckStop();
}
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
if (_dcpOptions.DeleteResourcesOnShutdown)
{
try
{
await _appExecutor.DeleteResourcesAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error deleting application resources.");
}
}
_shutdownCts.Cancel();
await _appExecutor.StopAsync(cancellationToken).ConfigureAwait(false);
await TaskHelpers.WaitIgnoreCancelAsync(_logProcessorTask, _logger, "Error in logging socket processor.").ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
{
if (_dcpRunDisposable is { } disposable)
{
_dcpRunDisposable = null;
try
{
await disposable.DisposeAsync().AsTask().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Shutdown requested.
}
catch (Exception ex)
{
_logger.LogCritical(ex, "One or more monitoring tasks terminated with an error.");
}
}
}
private void EnsureDcpHostRunning()
{
AspireEventSource.Instance.DcpApiServerLaunchStart();
try
{
var dcpProcessSpec = CreateDcpProcessSpec(_locations);
// Enable Unix Domain Socket based log streaming from DCP
try
{
AspireEventSource.Instance.DcpLogSocketCreateStart();
var loggingSocket = CreateLoggingSocket(_locations.DcpLogSocket);
loggingSocket.Listen(LoggingSocketConnectionBacklog);
dcpProcessSpec.EnvironmentVariables.Add("DCP_LOG_SOCKET", _locations.DcpLogSocket);
_logProcessorTask = Task.Run(() => StartLoggingSocketAsync(loggingSocket));
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to enable orchestration logging.");
}
finally
{
AspireEventSource.Instance.DcpLogSocketCreateStop();
}
(_, _dcpRunDisposable) = ProcessUtil.Run(dcpProcessSpec);
}
finally
{
AspireEventSource.Instance.DcpApiServerLaunchStop();
}
}
private ProcessSpec CreateDcpProcessSpec(Locations locations)
{
var dcpExePath = _dcpOptions.CliPath;
if (!File.Exists(dcpExePath))
{
throw new FileNotFoundException($"The Aspire application host is not installed at \"{dcpExePath}\". The application cannot be run without it.", dcpExePath);
}
var arguments = $"start-apiserver --monitor {Environment.ProcessId} --detach --kubeconfig \"{locations.DcpKubeconfigPath}\"";
if (!string.IsNullOrEmpty(_dcpOptions.ContainerRuntime))
{
arguments += $" --container-runtime \"{_dcpOptions.ContainerRuntime}\"";
}
ProcessSpec dcpProcessSpec = new ProcessSpec(dcpExePath)
{
WorkingDirectory = Directory.GetCurrentDirectory(),
Arguments = arguments,
OnOutputData = Console.Out.Write,
OnErrorData = Console.Error.Write,
InheritEnv = false,
};
_logger.LogInformation("Starting DCP with arguments: {Arguments}", dcpProcessSpec.Arguments);
foreach (DictionaryEntry de in Environment.GetEnvironmentVariables())
{
var key = de.Key?.ToString();
var val = de.Value?.ToString();
if (key is not null && val is not null && !s_doNotInheritEnvironmentVars.Contains(key))
{
dcpProcessSpec.EnvironmentVariables.Add(key, val);
}
}
if (!string.IsNullOrEmpty(_dcpOptions.ExtensionsPath))
{
dcpProcessSpec.EnvironmentVariables.Add("DCP_EXTENSIONS_PATH", _dcpOptions.ExtensionsPath);
}
if (!string.IsNullOrEmpty(_dcpOptions.BinPath))
{
dcpProcessSpec.EnvironmentVariables.Add("DCP_BIN_PATH", _dcpOptions.BinPath);
}
// Set an environment variable to contain session info that should be deleted when DCP is done
// Currently this contains the Unix socket for logging and the kubeconfig
dcpProcessSpec.EnvironmentVariables.Add("DCP_SESSION_FOLDER", locations.DcpSessionDir);
return dcpProcessSpec;
}
private static Socket CreateLoggingSocket(string socketPath)
{
string? directoryName = Path.GetDirectoryName(socketPath);
if (!string.IsNullOrEmpty(directoryName))
{
if (OperatingSystem.IsWindows())
{
Directory.CreateDirectory(directoryName);
}
else
{
Directory.CreateDirectory(directoryName, UnixFileMode.UserExecute | UnixFileMode.UserWrite | UnixFileMode.UserRead);
}
}
Socket socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
socket.Bind(new UnixDomainSocketEndPoint(socketPath));
return socket;
}
private async Task StartLoggingSocketAsync(Socket socket)
{
List<Task> outputLoggers = [];
while (!_shutdownCts.IsCancellationRequested)
{
try
{
Socket acceptedSocket = await socket.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
outputLoggers.Add(Task.Run(() => LogSocketOutputAsync(acceptedSocket, _shutdownCts.Token)));
}
catch
{
// Suppress exceptions reading logs from DCP controllers
}
}
await Task.WhenAll(outputLoggers).ConfigureAwait(false);
socket.Dispose();
}
private async Task LogSocketOutputAsync(Socket socket, CancellationToken cancellationToken)
{
using var stream = new NetworkStream(socket, ownsSocket: true);
using var _ = cancellationToken.Register(s => ((NetworkStream)s!).Close(), stream);
var reader = PipeReader.Create(stream);
// Logger cache to avoid creating a new string per log line, for a few categories
var loggerCache = new Dictionary<int, ILogger>();
(ILogger, LogLevel, string message) GetLogInfo(ReadOnlySpan<byte> line)
{
// The log format is
// <date>\t<level>\t<category>\t<log message>
// e.g. 2023-09-19T20:40:50.509-0700 info dcpctrl.ServiceReconciler service /apigateway is now in state Ready {"ServiceName": {"name":"apigateway"}}
var tab = line.IndexOf((byte)'\t');
var date = line[..tab];
line = line[(tab + 1)..];
tab = line.IndexOf((byte)'\t');
var level = line[..tab];
line = line[(tab + 1)..];
tab = line.IndexOf((byte)'\t');
var category = line[..tab];
line = line[(tab + 1)..];
var message = line;
var logLevel = LogLevel.Information;
if (level.SequenceEqual("info"u8))
{
logLevel = LogLevel.Information;
}
else if (level.SequenceEqual("error"u8))
{
logLevel = LogLevel.Error;
}
else if (level.SequenceEqual("warning"u8))
{
logLevel = LogLevel.Warning;
}
else if (level.SequenceEqual("debug"u8))
{
logLevel = LogLevel.Debug;
}
else if (level.SequenceEqual("trace"u8))
{
logLevel = LogLevel.Trace;
}
var hash = new HashCode();
hash.AddBytes(category);
var hashValue = hash.ToHashCode();
if (!loggerCache.TryGetValue(hashValue, out var logger))
{
// loggerFactory.CreateLogger internally caches, but we may as well cache the logger as well as the string
// for the lifetime of this socket
loggerCache[hashValue] = logger = _loggerFactory.CreateLogger($"Aspire.Hosting.Dcp.{Encoding.UTF8.GetString(category)}");
}
return (logger, logLevel, Encoding.UTF8.GetString(message));
}
try
{
void LogLines(in ReadOnlySequence<byte> buffer, out SequencePosition position)
{
var seq = new SequenceReader<byte>(buffer);
while (seq.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n'))
{
var (logger, logLevel, message) = GetLogInfo(line);
logger.Log(logLevel, 0, message, null, static (value, ex) => value);
}
position = seq.Position;
}
while (!cancellationToken.IsCancellationRequested)
{
var result = await reader.ReadAsync(CancellationToken.None).ConfigureAwait(false);
if (result.IsCompleted || result.IsCanceled)
{
break;
}
LogLines(result.Buffer, out var position);
reader.AdvanceTo(position, result.Buffer.End);
}
}
catch
{
// Suppress exceptions reading logs from DCP controllers
}
finally
{
reader.Complete();
}
}
public Task StartedAsync(CancellationToken _)
{
AspireEventSource.Instance.DcpHostStartupStop();
return Task.CompletedTask;
}
public Task StartingAsync(CancellationToken cancellationToken)
{
AspireEventSource.Instance.DcpHostStartupStart();
return Task.CompletedTask;
}
public Task StoppedAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StoppingAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
|