File: System\Net\Sockets\SocketAsyncEngine.Unix.cs
Web Access
Project: src\src\libraries\System.Net.Sockets\src\System.Net.Sockets.csproj (System.Net.Sockets)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
 
namespace System.Net.Sockets
{
    internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
    {
        private const int EventBufferCount =
#if DEBUG
            32;
#else
            1024;
#endif
 
        // Socket continuations are dispatched to the ThreadPool from the event thread.
        // This avoids continuations blocking the event handling.
        // Setting PreferInlineCompletions allows continuations to run directly on the event thread.
        // PreferInlineCompletions defaults to false and can be set to true using the DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS envvar.
        internal static readonly bool InlineSocketCompletionsEnabled = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";
 
        private static int GetEngineCount()
        {
            // The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
            // and schedule corresponding work items to ThreadPool (socket reads and writes).
            //
            // Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load
            // we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores.
            //
            // The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second)
            // and having a single producer should be almost always enough.
            //
            // We want to be sure that we can handle extreme loads and that's why we have decided to use these values.
            //
            // It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables.
            if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
            {
                return (int)count;
            }
 
            // When inlining continuations, we default to ProcessorCount to make sure event threads cannot be a bottleneck.
            if (InlineSocketCompletionsEnabled)
            {
                return Environment.ProcessorCount;
            }
 
            Architecture architecture = RuntimeInformation.ProcessArchitecture;
            int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
                ? 8
                : 30;
 
            return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
        }
 
        private static readonly SocketAsyncEngine[] s_engines = CreateEngines();
        private static int s_allocateFromEngine = -1;
 
        private static SocketAsyncEngine[] CreateEngines()
        {
            int engineCount = GetEngineCount();
 
            var engines = new SocketAsyncEngine[engineCount];
 
            for (int i = 0; i < engineCount; i++)
            {
                engines[i] = new SocketAsyncEngine();
            }
 
            return engines;
        }
 
        private readonly IntPtr _port;
        private readonly Interop.Sys.SocketEvent* _buffer;
 
        //
        // Maps handle values to SocketAsyncContext instances.
        //
        private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();
 
        //
        // Queue of events generated by EventLoop() that would be processed by the thread pool
        //
        private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();
 
        // The scheme works as follows:
        // - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
        // - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
        // - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
        //   or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
        //
        // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
        // Another work item isn't enqueued to the thread pool hastily while the state is Determining,
        // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
        private enum EventQueueProcessingStage
        {
            NotScheduled,
            Determining,
            Scheduled
        }
 
        private int _eventQueueProcessingStage;
 
        //
        // Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
        //
        public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine, out Interop.Error error)
        {
            int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length);
            SocketAsyncEngine nextEngine = s_engines[engineIndex];
            bool registered = nextEngine.TryRegisterCore(socketHandle, context, out error);
            engine = registered ? nextEngine : null;
            return registered;
        }
 
        private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
        {
            bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context));
            if (!added)
            {
                // Using public SafeSocketHandle(IntPtr) a user can add the same handle
                // from a different Socket instance.
                throw new InvalidOperationException(SR.net_sockets_handle_already_used);
            }
 
            error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
                Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle);
            if (error == Interop.Error.SUCCESS)
            {
                return true;
            }
 
            _handleToContextMap.TryRemove(socketHandle, out _);
            return false;
        }
 
        public void UnregisterSocket(IntPtr socketHandle)
        {
            _handleToContextMap.TryRemove(socketHandle, out _);
        }
 
        private SocketAsyncEngine()
        {
            _port = (IntPtr)(-1);
            try
            {
                //
                // Create the event port and buffer
                //
                Interop.Error err;
                fixed (IntPtr* portPtr = &_port)
                {
                    err = Interop.Sys.CreateSocketEventPort(portPtr);
                    if (err != Interop.Error.SUCCESS)
                    {
                        throw new InternalException(err);
                    }
                }
 
                fixed (Interop.Sys.SocketEvent** bufferPtr = &_buffer)
                {
                    err = Interop.Sys.CreateSocketEventBuffer(EventBufferCount, bufferPtr);
                    if (err != Interop.Error.SUCCESS)
                    {
                        throw new InternalException(err);
                    }
                }
 
                var thread = new Thread(static s => ((SocketAsyncEngine)s!).EventLoop())
                {
                    IsBackground = true,
                    Name = ".NET Sockets"
                };
                thread.UnsafeStart(this);
            }
            catch
            {
                FreeNativeResources();
                throw;
            }
        }
 
        private void EventLoop()
        {
            try
            {
                SocketEventHandler handler = new SocketEventHandler(this);
                while (true)
                {
                    int numEvents = EventBufferCount;
                    Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, handler.Buffer, &numEvents);
                    if (err != Interop.Error.SUCCESS)
                    {
                        throw new InternalException(err);
                    }
 
                    // The native shim is responsible for ensuring this condition.
                    Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");
 
                    // Only enqueue a work item if the stage is NotScheduled.
                    // Otherwise there must be a work item already queued or another thread already handling parallelization.
                    if (handler.HandleSocketEvents(numEvents) &&
                        Interlocked.Exchange(
                            ref _eventQueueProcessingStage,
                            (int)EventQueueProcessingStage.Scheduled) == (int)EventQueueProcessingStage.NotScheduled)
                    {
                        ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
                    }
                }
            }
            catch (Exception e)
            {
                Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e);
            }
        }
 
        private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
        {
            if (!isEventQueueEmpty)
            {
                // There are more events to process, set stage to Scheduled and enqueue a work item.
                _eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled;
            }
            else
            {
                // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
                // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
                // would not have scheduled a work item to process the work, so schedule one now.
                int stageBeforeUpdate =
                    Interlocked.CompareExchange(
                        ref _eventQueueProcessingStage,
                        (int)EventQueueProcessingStage.NotScheduled,
                        (int)EventQueueProcessingStage.Determining);
                Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled);
                if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining)
                {
                    return;
                }
            }
 
            ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
        }
 
        void IThreadPoolWorkItem.Execute()
        {
            ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
            SocketIOEvent ev;
            while (true)
            {
                Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled);
 
                // The change needs to be visible to other threads that may request a worker thread before a work item is attempted
                // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
                // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
                // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
                // Scheduled, and try to dequeue again or request another thread.
                _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining;
                Interlocked.MemoryBarrier();
 
                if (eventQueue.TryDequeue(out ev))
                {
                    break;
                }
 
                // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
                // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
                // would not have scheduled a work item to process the work, so try to dequeue a work item again.
                int stageBeforeUpdate =
                    Interlocked.CompareExchange(
                        ref _eventQueueProcessingStage,
                        (int)EventQueueProcessingStage.NotScheduled,
                        (int)EventQueueProcessingStage.Determining);
                Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled);
                if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining)
                {
                    return;
                }
            }
 
            UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
 
            int startTimeMs = Environment.TickCount;
            do
            {
                ev.Context.HandleEvents(ev.Events);
 
                // If there is a constant stream of new events, and/or if user callbacks take long to process an event, this
                // work item may run for a long time. If work items of this type are using up all of the thread pool threads,
                // collectively they may starve other types of work items from running. Before dequeuing and processing another
                // event, check the elapsed time since the start of the work item and yield the thread after some time has
                // elapsed to allow the thread pool to run other work items.
                //
                // The threshold chosen below was based on trying various thresholds and in trying to keep the latency of
                // running another work item low when these work items are using up all of the thread pool worker threads. In
                // such cases, the latency would be something like threshold / proc count. Smaller thresholds were tried and
                // using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater
                // impact on throughput compared to the threshold chosen below, though it is slight enough that it may not
                // matter much. Higher thresholds didn't seem to have any noticeable effect.
            } while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev));
        }
 
        private void FreeNativeResources()
        {
            if (_buffer != null)
            {
                Interop.Sys.FreeSocketEventBuffer(_buffer);
            }
            if (_port != (IntPtr)(-1))
            {
                Interop.Sys.CloseSocketEventPort(_port);
            }
        }
 
        // The JIT is allowed to arbitrarily extend the lifetime of locals, which may retain SocketAsyncContext references,
        // indirectly preventing Socket instances to be finalized, despite being no longer referenced by user code.
        // To avoid this, the event handling logic is delegated to a non-inlined processing method.
        // See discussion: https://github.com/dotnet/runtime/issues/37064
        // SocketEventHandler holds an on-stack cache of SocketAsyncEngine members needed by the handler method.
        private readonly struct SocketEventHandler
        {
            public Interop.Sys.SocketEvent* Buffer { get; }
 
            private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap;
            private readonly ConcurrentQueue<SocketIOEvent> _eventQueue;
 
            public SocketEventHandler(SocketAsyncEngine engine)
            {
                Buffer = engine._buffer;
                _handleToContextMap = engine._handleToContextMap;
                _eventQueue = engine._eventQueue;
            }
 
            [MethodImpl(MethodImplOptions.NoInlining)]
            public bool HandleSocketEvents(int numEvents)
            {
                bool enqueuedEvent = false;
                foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(Buffer, numEvents))
                {
                    if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContextWrapper contextWrapper))
                    {
                        SocketAsyncContext context = contextWrapper.Context;
 
                        if (context.PreferInlineCompletions)
                        {
                            context.HandleEventsInline(socketEvent.Events);
                        }
                        else
                        {
                            Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
 
                            if (events != Interop.Sys.SocketEvents.None)
                            {
                                _eventQueue.Enqueue(new SocketIOEvent(context, events));
                                enqueuedEvent = true;
                            }
                        }
                    }
                }
 
                return enqueuedEvent;
            }
        }
 
        // struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
        // the goal is to have a dedicated generic instantiation and using:
        // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
        // instead of:
        // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
        private readonly struct SocketAsyncContextWrapper
        {
            public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;
 
            internal SocketAsyncContext Context { get; }
        }
 
        private readonly struct SocketIOEvent
        {
            public SocketAsyncContext Context { get; }
            public Interop.Sys.SocketEvents Events { get; }
 
            public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events)
            {
                Context = context;
                Events = events;
            }
        }
    }
}