|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace Aspire.Hosting;
/// <summary>
/// Provides extension methods for adding Kafka resources to the application model.
/// </summary>
public static class KafkaBuilderExtensions
{
private const int KafkaBrokerPort = 9092;
private const int KafkaInternalBrokerPort = 9093;
private const int KafkaUIPort = 8080;
private const string Target = "/var/lib/kafka/data";
/// <summary>
/// Adds a Kafka resource to the application. A container is used for local development.
/// </summary>
/// <remarks>
/// This version of the package defaults to the <inheritdoc cref="KafkaContainerImageTags.Tag"/> tag of the <inheritdoc cref="KafkaContainerImageTags.Image"/> container image.
/// </remarks>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, [ResourceName] string name, int? port = null)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(name);
var kafka = new KafkaServerResource(name);
string? connectionString = null;
builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(kafka, async (@event, ct) =>
{
connectionString = await kafka.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{kafka.Name}' resource but the connection string was null.");
}
});
var healthCheckKey = $"{name}_check";
// NOTE: We cannot use AddKafka here because it registers the health check as a singleton
// which means if you have multiple Kafka resources the factory callback will end
// up using the connection string of the last Kafka resource that was added. The
// client packages also have to work around this issue.
//
// SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298
var healthCheckRegistration = new HealthCheckRegistration(
healthCheckKey,
sp =>
{
var options = new KafkaHealthCheckOptions();
options.Configuration = new ProducerConfig();
options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
return new KafkaHealthCheck(options);
},
failureStatus: default,
tags: default);
builder.Services.AddHealthChecks().Add(healthCheckRegistration);
return builder.AddResource(kafka)
.WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName)
.WithEndpoint(targetPort: KafkaInternalBrokerPort, name: KafkaServerResource.InternalEndpointName)
.WithImage(KafkaContainerImageTags.Image, KafkaContainerImageTags.Tag)
.WithImageRegistry(KafkaContainerImageTags.Registry)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka))
.WithHealthCheck(healthCheckKey);
}
/// <summary>
/// Adds a Kafka UI container to the application.
/// </summary>
/// <remarks>
/// This version of the package defaults to the <inheritdoc cref="KafkaContainerImageTags.KafkaUiTag"/> tag of the <inheritdoc cref="KafkaContainerImageTags.KafkaUiImage"/> container image.
/// </remarks>
/// <param name="builder">The Kafka server resource builder.</param>
/// <param name="configureContainer">Configuration callback for KafkaUI container resource.</param>
/// <param name="containerName">The name of the container (Optional).</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithKafkaUI(this IResourceBuilder<KafkaServerResource> builder, Action<IResourceBuilder<KafkaUIContainerResource>>? configureContainer = null, string? containerName = null)
{
ArgumentNullException.ThrowIfNull(builder);
if (builder.ApplicationBuilder.Resources.OfType<KafkaUIContainerResource>().SingleOrDefault() is { } existingKafkaUIResource)
{
var builderForExistingResource = builder.ApplicationBuilder.CreateResourceBuilder(existingKafkaUIResource);
configureContainer?.Invoke(builderForExistingResource);
return builder;
}
else
{
containerName ??= $"{builder.Resource.Name}-kafka-ui";
var kafkaUi = new KafkaUIContainerResource(containerName);
var kafkaUiBuilder = builder.ApplicationBuilder.AddResource(kafkaUi)
.WithImage(KafkaContainerImageTags.KafkaUiImage, KafkaContainerImageTags.KafkaUiTag)
.WithImageRegistry(KafkaContainerImageTags.Registry)
.WithHttpEndpoint(targetPort: KafkaUIPort)
.ExcludeFromManifest();
builder.ApplicationBuilder.Eventing.Subscribe<AfterEndpointsAllocatedEvent>((e, ct) =>
{
var kafkaResources = builder.ApplicationBuilder.Resources.OfType<KafkaServerResource>();
int i = 0;
foreach (var kafkaResource in kafkaResources)
{
if (kafkaResource.InternalEndpoint.IsAllocated)
{
var endpoint = kafkaResource.InternalEndpoint;
int index = i;
kafkaUiBuilder.WithEnvironment(context => ConfigureKafkaUIContainer(context, endpoint, index));
}
i++;
}
return Task.CompletedTask;
});
configureContainer?.Invoke(kafkaUiBuilder);
return builder;
}
static void ConfigureKafkaUIContainer(EnvironmentCallbackContext context, EndpointReference endpoint, int index)
{
var bootstrapServers = context.ExecutionContext.IsRunMode
// In run mode, Kafka UI assumes Kafka is being accessed over a default Aspire container network and hardcodes the host as the Kafka resource name
// This will need to be refactored once updated service discovery APIs are available
? ReferenceExpression.Create($"{endpoint.Resource.Name}:{endpoint.Property(EndpointProperty.TargetPort)}")
: ReferenceExpression.Create($"{endpoint.Property(EndpointProperty.Host)}:{endpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables.Add($"KAFKA_CLUSTERS_{index}_NAME", endpoint.Resource.Name);
context.EnvironmentVariables.Add($"KAFKA_CLUSTERS_{index}_BOOTSTRAPSERVERS", bootstrapServers);
}
}
/// <summary>
/// Configures the host port that the KafkaUI resource is exposed on instead of using randomly assigned port.
/// </summary>
/// <param name="builder">The resource builder for KafkaUI.</param>
/// <param name="port">The port to bind on the host. If <see langword="null"/> is used random port will be assigned.</param>
/// <returns>The resource builder for KafkaUI.</returns>
public static IResourceBuilder<KafkaUIContainerResource> WithHostPort(this IResourceBuilder<KafkaUIContainerResource> builder, int? port)
{
ArgumentNullException.ThrowIfNull(builder);
return builder.WithEndpoint("http", endpoint =>
{
endpoint.Port = port;
});
}
/// <summary>
/// Adds a named volume for the data folder to a Kafka container resource.
/// </summary>
/// <param name="builder">The resource builder.</param>
/// <param name="name">The name of the volume. Defaults to an auto-generated name based on the application and resource names.</param>
/// <param name="isReadOnly">A flag that indicates if this is a read-only volume.</param>
/// <returns>The <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithDataVolume(this IResourceBuilder<KafkaServerResource> builder, string? name = null, bool isReadOnly = false)
{
ArgumentNullException.ThrowIfNull(builder);
return builder
.WithEnvironment(ConfigureLogDirs)
.WithVolume(name ?? VolumeNameGenerator.Generate(builder, "data"), Target, isReadOnly);
}
/// <summary>
/// Adds a bind mount for the data folder to a Kafka container resource.
/// </summary>
/// <param name="builder">The resource builder.</param>
/// <param name="source">The source directory on the host to mount into the container.</param>
/// <param name="isReadOnly">A flag that indicates if this is a read-only mount.</param>
/// <returns>The <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithDataBindMount(this IResourceBuilder<KafkaServerResource> builder, string source, bool isReadOnly = false)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(source);
return builder
.WithEnvironment(ConfigureLogDirs)
.WithBindMount(source, Target, isReadOnly);
}
private static void ConfigureKafkaContainer(EnvironmentCallbackContext context, KafkaServerResource resource)
{
// confluentinc/confluent-local is a docker image that contains a Kafka broker started with KRaft to avoid pulling a separate image for ZooKeeper.
// See https://github.com/confluentinc/kafka-images/blob/master/local/README.md.
// When not explicitly set default configuration is applied.
// See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults for more details.
// Define the default listeners + an internal listener for the container to broker communication
context.EnvironmentVariables.Add($"KAFKA_LISTENERS", $"PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:{KafkaBrokerPort},PLAINTEXT_INTERNAL://0.0.0.0:{KafkaInternalBrokerPort}");
// Defaults default listeners security protocol map + the internal listener to be PLAINTEXT
context.EnvironmentVariables.Add("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT");
// primaryEndpoint is the endpoint that is exposed to the host machine
var primaryEndpoint = resource.PrimaryEndpoint;
// internalEndpoint is the endpoint that is used for communication between containers
var internalEndpoint = resource.InternalEndpoint;
var advertisedListeners = context.ExecutionContext.IsRunMode
// In run mode, PLAINTEXT_INTERNAL assumes kafka is being accessed over a default Aspire container network and hardcodes the resource address
// This will need to be refactored once updated service discovery APIs are available
? ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_INTERNAL://{resource.Name}:{internalEndpoint.Property(EndpointProperty.TargetPort)}")
: ReferenceExpression.Create(
$"PLAINTEXT://{primaryEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{primaryEndpoint.Property(EndpointProperty.Host)}:{primaryEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_INTERNAL://{internalEndpoint.Property(EndpointProperty.Host)}:{internalEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
}
/// <summary>
/// Only need to call this if we want to persistent kafka data
/// </summary>
/// <param name="context"></param>
private static void ConfigureLogDirs(EnvironmentCallbackContext context)
{
context.EnvironmentVariables["KAFKA_LOG_DIRS"] = Target;
}
}
|