// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 #nullable disable using System.Diagnostics; using OpenTelemetry.Internal; namespace OpenTelemetry.Instrumentation; internal sealed class DiagnosticSourceSubscriber : IDisposable, IObserver<DiagnosticListener> { private readonly List<IDisposable> listenerSubscriptions; private readonly Func<string, ListenerHandler> handlerFactory; private readonly Func<DiagnosticListener, bool> diagnosticSourceFilter; private readonly Func<string, object, object, bool> isEnabledFilter; private readonly Action<string, string, Exception> logUnknownException; private long disposed; private IDisposable allSourcesSubscription; public DiagnosticSourceSubscriber( ListenerHandler handler, Func<string, object, object, bool> isEnabledFilter, Action<string, string, Exception> logUnknownException) : this(_ => handler, value => handler.SourceName == value.Name, isEnabledFilter, logUnknownException) { } public DiagnosticSourceSubscriber( Func<string, ListenerHandler> handlerFactory, Func<DiagnosticListener, bool> diagnosticSourceFilter, Func<string, object, object, bool> isEnabledFilter, Action<string, string, Exception> logUnknownException) { Guard.ThrowIfNull(handlerFactory); this.listenerSubscriptions = new List<IDisposable>(); this.handlerFactory = handlerFactory; this.diagnosticSourceFilter = diagnosticSourceFilter; this.isEnabledFilter = isEnabledFilter; this.logUnknownException = logUnknownException; } public void Subscribe() { if (this.allSourcesSubscription == null) { this.allSourcesSubscription = DiagnosticListener.AllListeners.Subscribe(this); } } public void OnNext(DiagnosticListener value) { if ((Interlocked.Read(ref this.disposed) == 0) && this.diagnosticSourceFilter(value)) { var handler = this.handlerFactory(value.Name); var listener = new DiagnosticSourceListener(handler, this.logUnknownException); var subscription = this.isEnabledFilter == null ? value.Subscribe(listener) : value.Subscribe(listener, this.isEnabledFilter); lock (this.listenerSubscriptions) { this.listenerSubscriptions.Add(subscription); } } } public void OnCompleted() { } public void OnError(Exception error) { } /// <inheritdoc/> public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (Interlocked.CompareExchange(ref this.disposed, 1, 0) == 1) { return; } lock (this.listenerSubscriptions) { foreach (var listenerSubscription in this.listenerSubscriptions) { listenerSubscription?.Dispose(); } this.listenerSubscriptions.Clear(); } this.allSourcesSubscription?.Dispose(); this.allSourcesSubscription = null; } } |