File: Utils\Batching.cs
Web Access
Project: src\src\Libraries\Microsoft.Extensions.DataIngestion\Microsoft.Extensions.DataIngestion.csproj (Microsoft.Extensions.DataIngestion)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;
 
namespace Microsoft.Extensions.DataIngestion;
 
internal static class Batching
{
    internal static async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync<TMetadata>(IAsyncEnumerable<IngestionChunk<string>> chunks,
        EnricherOptions options,
        string metadataKey,
        ChatMessage systemPrompt,
        ILogger? logger,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
        where TMetadata : notnull
    {
        _ = Throw.IfNull(chunks);
 
        await foreach (var batch in chunks.Chunk(options.BatchSize).WithCancellation(cancellationToken))
        {
            List<AIContent> contents = new(batch.Length);
            foreach (var chunk in batch)
            {
                contents.Add(new TextContent(chunk.Content));
            }
 
            try
            {
                ChatResponse<TMetadata[]> response = await options.ChatClient.GetResponseAsync<TMetadata[]>(
                [
                    systemPrompt,
                    new(ChatRole.User, contents)
                ], options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
 
                if (response.Result.Length == contents.Count)
                {
                    for (int i = 0; i < response.Result.Length; i++)
                    {
                        batch[i].Metadata[metadataKey] = response.Result[i];
                    }
                }
                else
                {
                    logger?.UnexpectedResultsCount(response.Result.Length, contents.Count);
                }
            }
#pragma warning disable CA1031 // Do not catch general exception types
            catch (Exception ex)
#pragma warning restore CA1031 // Do not catch general exception types
            {
                // Enricher failures should not fail the whole ingestion pipeline, as they are best-effort enhancements.
                logger?.UnexpectedEnricherFailure(ex);
            }
 
            foreach (var chunk in batch)
            {
                yield return chunk;
            }
        }
    }
}