File: src\libraries\System.Private.CoreLib\src\System\Diagnostics\Tracing\EventPipeEventDispatcher.cs
Web Access
Project: src\src\coreclr\System.Private.CoreLib\System.Private.CoreLib.csproj (System.Private.CoreLib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Diagnostics.Tracing
{
    internal sealed class EventPipeEventDispatcher
    {
        internal sealed class EventListenerSubscription
        {
            internal EventKeywords MatchAnyKeywords { get; }
            internal EventLevel Level { get; }
 
            internal EventListenerSubscription(EventKeywords matchAnyKeywords, EventLevel level)
            {
                MatchAnyKeywords = matchAnyKeywords;
                Level = level;
            }
        }
 
        internal static readonly EventPipeEventDispatcher Instance = new EventPipeEventDispatcher();
 
        private readonly IntPtr m_RuntimeProviderID;
 
        private ulong m_sessionID;
 
        private CancellationTokenSource? m_dispatchTaskCancellationSource;
        private Task? m_dispatchTask;
 
        // We take this lock to synchronize access to the shared session state. It is important to never take the EventSource.EventListenersLock while
        // holding this, or we can deadlock. Unfortunately calling in to EventSource at all can take the EventListenersLock in ways that are not obvious,
        // so don't call in to EventSource or other EventListeners while holding this unless you are certain it can't take the EventListenersLock.
        private readonly object m_dispatchControlLock = new object();
        private readonly Dictionary<EventListener, EventListenerSubscription> m_subscriptions = new Dictionary<EventListener, EventListenerSubscription>();
 
        private const uint DefaultEventListenerCircularMBSize = 10;
 
        private EventPipeEventDispatcher()
        {
            // Get the ID of the runtime provider so that it can be used as a filter when processing events.
            m_RuntimeProviderID = EventPipeInternal.GetProvider(NativeRuntimeEventSource.EventSourceName);
        }
 
        internal void SendCommand(EventListener eventListener, EventCommand command, bool enable, EventLevel level, EventKeywords matchAnyKeywords)
        {
            lock (m_dispatchControlLock)
            {
                if (command == EventCommand.Update && enable)
                {
                    // Add the new subscription.  This will overwrite an existing subscription for the listener if one exists.
                    m_subscriptions[eventListener] = new EventListenerSubscription(matchAnyKeywords, level);
                }
                else if (command == EventCommand.Update && !enable)
                {
                    // Remove the event listener from the list of subscribers.
                    m_subscriptions.Remove(eventListener);
                }
 
                // Commit the configuration change.
                CommitDispatchConfiguration();
            }
        }
 
        private void CommitDispatchConfiguration()
        {
            Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));
 
            // Signal that the thread should shut down
            SetStopDispatchTask();
 
            // Check to see if tracing should be enabled.
            if (m_subscriptions.Count <= 0)
            {
                return;
            }
 
            // Determine the keywords and level that should be used based on the set of enabled EventListeners.
            EventKeywords aggregatedKeywords = EventKeywords.None;
            EventLevel enableLevel = EventLevel.Critical;
 
            foreach (EventListenerSubscription subscription in m_subscriptions.Values)
            {
                aggregatedKeywords |= subscription.MatchAnyKeywords;
 
                if (enableLevel is EventLevel.LogAlways)
                {
                    continue;
                }
                if ((enableLevel < subscription.Level) ||
                    (subscription.Level is EventLevel.LogAlways))
                {
                    enableLevel = subscription.Level;
                }
            }
 
            // Enable the EventPipe session.
            EventPipeProviderConfiguration[] providerConfiguration =
            [
                new EventPipeProviderConfiguration(NativeRuntimeEventSource.EventSourceName, (ulong)aggregatedKeywords, (uint)enableLevel, null)
            ];
 
            ulong sessionID = EventPipeInternal.Enable(null, EventPipeSerializationFormat.NetTrace, DefaultEventListenerCircularMBSize, providerConfiguration);
            if (sessionID == 0)
            {
                return;
            }
 
            // Get the session information that is required to properly dispatch events.
            EventPipeSessionInfo sessionInfo;
            unsafe
            {
                if (!EventPipeInternal.GetSessionInfo(sessionID, &sessionInfo))
                {
                    Debug.Fail("GetSessionInfo returned false.");
                }
            }
 
 
            DateTime syncTimeUtc = DateTime.FromFileTimeUtc(sessionInfo.StartTimeAsUTCFileTime);
            long syncTimeQPC = sessionInfo.StartTimeStamp;
            long timeQPCFrequency = sessionInfo.TimeStampFrequency;
 
            Debug.Assert(Volatile.Read(ref m_sessionID) == 0);
            Volatile.Write(ref m_sessionID, sessionID);
 
            // Start the dispatch task.
            StartDispatchTask(sessionID, syncTimeUtc, syncTimeQPC, timeQPCFrequency);
        }
 
        private void StartDispatchTask(ulong sessionID, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency)
        {
            if (OperatingSystem.IsBrowser() || OperatingSystem.IsWasi())
            {
                throw new PlatformNotSupportedException();
            }
 
            Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));
            Debug.Assert(sessionID != 0);
 
            m_dispatchTaskCancellationSource = new CancellationTokenSource();
            Task? previousDispatchTask = m_dispatchTask;
            m_dispatchTask = Task.Factory.StartNew(() => DispatchEventsToEventListeners(sessionID, syncTimeUtc, syncTimeQPC, timeQPCFrequency, previousDispatchTask, m_dispatchTaskCancellationSource.Token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
 
        private void SetStopDispatchTask()
        {
            Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));
 
            if (m_dispatchTaskCancellationSource?.IsCancellationRequested ?? true)
            {
                return;
            }
 
            ulong sessionID = Volatile.Read(ref m_sessionID);
            Debug.Assert(sessionID != 0);
            m_dispatchTaskCancellationSource.Cancel();
            EventPipeInternal.SignalSession(sessionID);
            Volatile.Write(ref m_sessionID, 0);
        }
 
        private unsafe void DispatchEventsToEventListeners(ulong sessionID, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency, Task? previousDispatchTask, CancellationToken token)
        {
            Debug.Assert(sessionID != 0);
            previousDispatchTask?.Wait(CancellationToken.None);
 
            // Struct to fill with the call to GetNextEvent.
            EventPipeEventInstanceData instanceData;
            while (!token.IsCancellationRequested)
            {
                bool eventsReceived = false;
                // Get the next event.
                while (!token.IsCancellationRequested && EventPipeInternal.GetNextEvent(sessionID, &instanceData))
                {
                    eventsReceived = true;
 
                    // Filter based on provider.
                    if (instanceData.ProviderID == m_RuntimeProviderID)
                    {
                        // Dispatch the event.
                        ReadOnlySpan<byte> payload = new ReadOnlySpan<byte>((void*)instanceData.Payload, (int)instanceData.PayloadLength);
                        DateTime dateTimeStamp = TimeStampToDateTime(instanceData.TimeStamp, syncTimeUtc, syncTimeQPC, timeQPCFrequency);
                        NativeRuntimeEventSource.Log.ProcessEvent(instanceData.EventID, instanceData.ThreadID, dateTimeStamp, instanceData.ActivityId, instanceData.ChildActivityId, payload);
                    }
                }
 
                // Wait for more events.
                if (!token.IsCancellationRequested)
                {
                    if (!eventsReceived)
                    {
                        EventPipeInternal.WaitForSessionSignal(sessionID, Timeout.Infinite);
                    }
 
                    Thread.Sleep(10);
                }
            }
 
            // Wait for SignalSession() to be called before we call disable, otherwise
            // the SignalSession() call could be on a disabled session.
            SpinWait sw = default;
            while (Volatile.Read(ref m_sessionID) == sessionID)
            {
                sw.SpinOnce();
            }
 
            // Disable the old session. This can happen asynchronously since we aren't using the old session
            // anymore.
            EventPipeInternal.Disable(sessionID);
        }
 
        /// <summary>
        /// Converts a QueryPerformanceCounter (QPC) timestamp to a UTC DateTime.
        /// </summary>
        private static DateTime TimeStampToDateTime(long timeStamp, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency)
        {
            if (timeStamp == long.MaxValue)
            {
                return DateTime.MaxValue;
            }
 
            Debug.Assert((syncTimeUtc.Ticks != 0) && (syncTimeQPC != 0) && (timeQPCFrequency != 0));
            long inTicks = (long)((timeStamp - syncTimeQPC) * 10000000.0 / timeQPCFrequency) + syncTimeUtc.Ticks;
            if ((inTicks < 0) || (DateTime.MaxTicks < inTicks))
            {
                inTicks = DateTime.MaxTicks;
            }
 
            return new DateTime(inTicks, DateTimeKind.Utc);
        }
    }
}