File: src\Vendoring\OpenTelemetry.Instrumentation.SqlClient\Shared\DiagnosticSourceInstrumentation\DiagnosticSourceSubscriber.cs
Web Access
Project: src\src\Components\Aspire.Microsoft.Data.SqlClient\Aspire.Microsoft.Data.SqlClient.csproj (Aspire.Microsoft.Data.SqlClient)
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
 
#nullable disable
 
using System.Diagnostics;
using OpenTelemetry.Internal;
 
namespace OpenTelemetry.Instrumentation;
 
internal sealed class DiagnosticSourceSubscriber : IDisposable, IObserver<DiagnosticListener>
{
    private readonly List<IDisposable> listenerSubscriptions;
    private readonly Func<string, ListenerHandler> handlerFactory;
    private readonly Func<DiagnosticListener, bool> diagnosticSourceFilter;
    private readonly Func<string, object, object, bool> isEnabledFilter;
    private readonly Action<string, string, Exception> logUnknownException;
    private long disposed;
    private IDisposable allSourcesSubscription;
 
    public DiagnosticSourceSubscriber(
        ListenerHandler handler,
        Func<string, object, object, bool> isEnabledFilter,
        Action<string, string, Exception> logUnknownException)
        : this(_ => handler, value => handler.SourceName == value.Name, isEnabledFilter, logUnknownException)
    {
    }
 
    public DiagnosticSourceSubscriber(
        Func<string, ListenerHandler> handlerFactory,
        Func<DiagnosticListener, bool> diagnosticSourceFilter,
        Func<string, object, object, bool> isEnabledFilter,
        Action<string, string, Exception> logUnknownException)
    {
        Guard.ThrowIfNull(handlerFactory);
 
        this.listenerSubscriptions = new List<IDisposable>();
        this.handlerFactory = handlerFactory;
        this.diagnosticSourceFilter = diagnosticSourceFilter;
        this.isEnabledFilter = isEnabledFilter;
        this.logUnknownException = logUnknownException;
    }
 
    public void Subscribe()
    {
        if (this.allSourcesSubscription == null)
        {
            this.allSourcesSubscription = DiagnosticListener.AllListeners.Subscribe(this);
        }
    }
 
    public void OnNext(DiagnosticListener value)
    {
        if ((Interlocked.Read(ref this.disposed) == 0) &&
            this.diagnosticSourceFilter(value))
        {
            var handler = this.handlerFactory(value.Name);
            var listener = new DiagnosticSourceListener(handler, this.logUnknownException);
            var subscription = this.isEnabledFilter == null ?
                value.Subscribe(listener) :
                value.Subscribe(listener, this.isEnabledFilter);
 
            lock (this.listenerSubscriptions)
            {
                this.listenerSubscriptions.Add(subscription);
            }
        }
    }
 
    public void OnCompleted()
    {
    }
 
    public void OnError(Exception error)
    {
    }
 
    /// <inheritdoc/>
    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
 
    private void Dispose(bool disposing)
    {
        if (Interlocked.CompareExchange(ref this.disposed, 1, 0) == 1)
        {
            return;
        }
 
        lock (this.listenerSubscriptions)
        {
            foreach (var listenerSubscription in this.listenerSubscriptions)
            {
                listenerSubscription?.Dispose();
            }
 
            this.listenerSubscriptions.Clear();
        }
 
        this.allSourcesSubscription?.Dispose();
        this.allSourcesSubscription = null;
    }
}