File: Processor.cs
Web Access
Project: src\playground\AspireEventHub\EventHubsConsumer\EventHubsConsumer.csproj (EventHubsConsumer)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
 
namespace EventHubsConsumer;
 
/// <summary>
///   Demonstrates how to use the <see cref="EventProcessorClient"/> to process events from an Azure Event Hub.
/// </summary>
/// <remarks>
///   The EventProcessorClient in Azure Event Hubs is a powerful tool that allows for the processing of events from multiple partitions concurrently.
///   It’s crucial to spread the reading of partitions across multiple nodes for load balancing and to maximize throughput.
///   This is achieved by assigning each partition to a specific node, ensuring that the workload is evenly distributed.
///   The EventProcessorClient is configured with a <see cref="BlobContainerClient"/> to store checkpoint and ownership data.
///   Use <see cref="EventProcessorClientOptions"/> to specify options such as the maximum wait time and prefetch count.
///   Proper configuration and usage of the EventProcessorClient can significantly improve the efficiency and performance of your Azure Event Hubs
///   applications. Remember to handle exceptions and monitor the health of your nodes to ensure smooth operation.
/// </remarks>
/// <example>
///   See samples at https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples
/// </example>
internal sealed class Processor(EventProcessorClient client, ILogger<Processor> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("Starting processor...");
 
        client.ProcessEventAsync += async arg =>
        {
            logger.LogInformation(arg.Data.EventBody.ToString());
 
            // save our current position in the configured storage account
            await arg.UpdateCheckpointAsync(stoppingToken);
        };
 
        client.ProcessErrorAsync += args =>
        {
            logger.LogError(args.Exception, "Error processing message: {Error}", args.Exception.Message);
            return Task.CompletedTask;
        };
 
        await client.StartProcessingAsync(stoppingToken);
    }
 
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        logger.LogInformation("Stopping processor...");
        await client.StopProcessingAsync(cancellationToken);
    }
}