|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Text;
using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Azure.EventHubs;
using Aspire.Hosting.Utils;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;
namespace Aspire.Hosting.Azure.Tests;
public class AzureEventHubsExtensionsTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnEventHubsEmulatorBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);
var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});
var resource = builder.AddAzureEventHubs("resource")
.AddEventHub("hubx")
.RunAsEmulator()
.WithHealthCheck("blocking_check");
var dependentResource = builder.AddContainer("nginx", "mcr.microsoft.com/cbl-mariner/base/nginx", "1.22")
.WaitFor(resource);
using var app = builder.Build();
var pendingStart = app.StartAsync(cts.Token);
var rns = app.Services.GetRequiredService<ResourceNotificationService>();
await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);
await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);
healthCheckTcs.SetResult(HealthCheckResult.Healthy());
await rns.WaitForResourceHealthyAsync(resource.Resource.Name, cts.Token);
await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);
await pendingStart;
await app.StopAsync();
}
[Fact]
[RequiresDocker]
public async Task VerifyAzureEventHubsEmulatorResource()
{
using var builder = TestDistributedApplicationBuilder.Create().WithTestAndResourceLogging(testOutputHelper);
var eventHub = builder.AddAzureEventHubs("eventhubns")
.RunAsEmulator()
.AddEventHub("hub");
using var app = builder.Build();
await app.StartAsync();
var hb = Host.CreateApplicationBuilder();
hb.Configuration["ConnectionStrings:eventhubns"] = await eventHub.Resource.ConnectionStringExpression.GetValueAsync(CancellationToken.None);
hb.AddAzureEventHubProducerClient("eventhubns", settings => settings.EventHubName = "hub");
hb.AddAzureEventHubConsumerClient("eventhubns", settings => settings.EventHubName = "hub");
using var host = hb.Build();
await host.StartAsync();
var producerClient = host.Services.GetRequiredService<EventHubProducerClient>();
var consumerClient = host.Services.GetRequiredService<EventHubConsumerClient>();
// If no exception is thrown when awaited, the Event Hubs service has acknowledged
// receipt and assumed responsibility for delivery of the set of events to its partition.
await producerClient.SendAsync([new EventData(Encoding.UTF8.GetBytes("hello worlds"))]);
await foreach (var partitionEvent in consumerClient.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(5) }))
{
Assert.Equal("hello worlds", Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray()));
break;
}
}
[Fact]
public void AzureEventHubsUseEmulatorCallbackWithWithDataBindMountResultsInBindMountAnnotationWithDefaultPath()
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eh").RunAsEmulator(configureContainer: builder =>
{
builder.WithDataBindMount();
});
// Ignoring the annotation created for the custom Config.json file
var volumeAnnotation = eventHubs.Resource.Annotations.OfType<ContainerMountAnnotation>().Single(a => !a.Target.Contains("Config.json"));
Assert.Equal(Path.Combine(builder.AppHostDirectory, ".eventhubs", "eh"), volumeAnnotation.Source);
Assert.Equal("/data", volumeAnnotation.Target);
Assert.Equal(ContainerMountType.BindMount, volumeAnnotation.Type);
Assert.False(volumeAnnotation.IsReadOnly);
}
[Fact]
public void AzureEventHubsUseEmulatorCallbackWithWithDataBindMountResultsInBindMountAnnotation()
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eh").RunAsEmulator(configureContainer: builder =>
{
builder.WithDataBindMount("mydata");
});
// Ignoring the annotation created for the custom Config.json file
var volumeAnnotation = eventHubs.Resource.Annotations.OfType<ContainerMountAnnotation>().Single(a => !a.Target.Contains("Config.json"));
Assert.Equal(Path.Combine(builder.AppHostDirectory, "mydata"), volumeAnnotation.Source);
Assert.Equal("/data", volumeAnnotation.Target);
Assert.Equal(ContainerMountType.BindMount, volumeAnnotation.Type);
Assert.False(volumeAnnotation.IsReadOnly);
}
[Fact]
public void AzureEventHubsUseEmulatorCallbackWithWithDataVolumeResultsInVolumeAnnotationWithDefaultName()
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eh").RunAsEmulator(configureContainer: builder =>
{
builder.WithDataVolume();
});
// Ignoring the annotation created for the custom Config.json file
var volumeAnnotation = eventHubs.Resource.Annotations.OfType<ContainerMountAnnotation>().Single(a => !a.Target.Contains("Config.json"));
Assert.Equal($"{builder.GetVolumePrefix()}-eh-data", volumeAnnotation.Source);
Assert.Equal("/data", volumeAnnotation.Target);
Assert.Equal(ContainerMountType.Volume, volumeAnnotation.Type);
Assert.False(volumeAnnotation.IsReadOnly);
}
[Fact]
public void AzureEventHubsUseEmulatorCallbackWithWithDataVolumeResultsInVolumeAnnotation()
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eh").RunAsEmulator(configureContainer: builder =>
{
builder.WithDataVolume("mydata");
});
// Ignoring the annotation created for the custom Config.json file
var volumeAnnotation = eventHubs.Resource.Annotations.OfType<ContainerMountAnnotation>().Single(a => !a.Target.Contains("Config.json"));
Assert.Equal("mydata", volumeAnnotation.Source);
Assert.Equal("/data", volumeAnnotation.Target);
Assert.Equal(ContainerMountType.Volume, volumeAnnotation.Type);
Assert.False(volumeAnnotation.IsReadOnly);
}
[Theory]
[InlineData(null)]
[InlineData(8081)]
[InlineData(9007)]
public void AzureEventHubsWithEmulatorGetsExpectedPort(int? port = null)
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eventhubs").RunAsEmulator(configureContainer: builder =>
{
builder.WithGatewayPort(port);
});
Assert.Collection(
eventHubs.Resource.Annotations.OfType<EndpointAnnotation>(),
e => Assert.Equal(port, e.Port)
);
}
[Theory]
[InlineData(null)]
[InlineData("2.3.97-preview")]
[InlineData("1.0.7")]
public void AzureEventHubsWithEmulatorGetsExpectedImageTag(string? imageTag)
{
using var builder = TestDistributedApplicationBuilder.Create();
var eventHubs = builder.AddAzureEventHubs("eventhubs");
eventHubs.RunAsEmulator(container =>
{
if (!string.IsNullOrEmpty(imageTag))
{
container.WithImageTag(imageTag);
}
});
var containerImageAnnotation = eventHubs.Resource.Annotations.OfType<ContainerImageAnnotation>().FirstOrDefault();
Assert.NotNull(containerImageAnnotation);
Assert.Equal(imageTag ?? EventHubsEmulatorContainerImageTags.Tag, containerImageAnnotation.Tag);
Assert.Equal(EventHubsEmulatorContainerImageTags.Registry, containerImageAnnotation.Registry);
Assert.Equal(EventHubsEmulatorContainerImageTags.Image, containerImageAnnotation.Image);
}
}
|