|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
namespace System.Net.Sockets
{
internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
{
private const int EventBufferCount =
#if DEBUG
32;
#else
1024;
#endif
// Socket continuations are dispatched to the ThreadPool from the event thread.
// This avoids continuations blocking the event handling.
// Setting PreferInlineCompletions allows continuations to run directly on the event thread.
// PreferInlineCompletions defaults to false and can be set to true using the DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS envvar.
internal static readonly bool InlineSocketCompletionsEnabled = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";
private static int GetEngineCount()
{
// The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
// and schedule corresponding work items to ThreadPool (socket reads and writes).
//
// Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load
// we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores.
//
// The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second)
// and having a single producer should be almost always enough.
//
// We want to be sure that we can handle extreme loads and that's why we have decided to use these values.
//
// It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables.
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
{
return (int)count;
}
// When inlining continuations, we default to ProcessorCount to make sure event threads cannot be a bottleneck.
if (InlineSocketCompletionsEnabled)
{
return Environment.ProcessorCount;
}
Architecture architecture = RuntimeInformation.ProcessArchitecture;
int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
? 8
: 30;
return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
}
private static readonly SocketAsyncEngine[] s_engines = CreateEngines();
private static int s_allocateFromEngine = -1;
private static SocketAsyncEngine[] CreateEngines()
{
int engineCount = GetEngineCount();
var engines = new SocketAsyncEngine[engineCount];
for (int i = 0; i < engineCount; i++)
{
engines[i] = new SocketAsyncEngine();
}
return engines;
}
/// <summary>
/// Each <see cref="SocketAsyncContext"/> is assigned an index into this table while registered with a <see cref="SocketAsyncEngine"/>.
/// <para>The index is used as the <see cref="Interop.Sys.SocketEvent.Data"/> to quickly map events to <see cref="SocketAsyncContext"/>s.</para>
/// <para>It is also stored in <see cref="SocketAsyncContext.GlobalContextIndex"/> so that we can efficiently remove it when unregistering the socket.</para>
/// </summary>
private static SocketAsyncContext?[] s_registeredContexts = [];
private static readonly Queue<int> s_registeredContextsFreeList = [];
private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;
//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();
// The scheme works as follows:
// - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
// - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
// - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
//
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
private enum EventQueueProcessingStage
{
NotScheduled,
Determining,
Scheduled
}
private EventQueueProcessingStage _eventQueueProcessingStage;
//
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
//
public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine, out Interop.Error error)
{
int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length);
SocketAsyncEngine nextEngine = s_engines[engineIndex];
bool registered = nextEngine.TryRegisterCore(socketHandle, context, out error);
engine = registered ? nextEngine : null;
return registered;
}
private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
{
Debug.Assert(context.GlobalContextIndex == -1);
lock (s_registeredContextsFreeList)
{
if (!s_registeredContextsFreeList.TryDequeue(out int index))
{
int previousLength = s_registeredContexts.Length;
int newLength = Math.Max(4, 2 * previousLength);
Array.Resize(ref s_registeredContexts, newLength);
for (int i = previousLength + 1; i < newLength; i++)
{
s_registeredContextsFreeList.Enqueue(i);
}
index = previousLength;
}
Debug.Assert(s_registeredContexts[index] is null);
s_registeredContexts[index] = context;
context.GlobalContextIndex = index;
}
error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, context.GlobalContextIndex);
if (error == Interop.Error.SUCCESS)
{
return true;
}
UnregisterSocket(context);
return false;
}
public static void UnregisterSocket(SocketAsyncContext context)
{
Debug.Assert(context.GlobalContextIndex >= 0);
Debug.Assert(ReferenceEquals(s_registeredContexts[context.GlobalContextIndex], context));
lock (s_registeredContextsFreeList)
{
s_registeredContexts[context.GlobalContextIndex] = null;
s_registeredContextsFreeList.Enqueue(context.GlobalContextIndex);
}
context.GlobalContextIndex = -1;
}
private SocketAsyncEngine()
{
_port = (IntPtr)(-1);
try
{
//
// Create the event port and buffer
//
Interop.Error err;
fixed (IntPtr* portPtr = &_port)
{
err = Interop.Sys.CreateSocketEventPort(portPtr);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
}
fixed (Interop.Sys.SocketEvent** bufferPtr = &_buffer)
{
err = Interop.Sys.CreateSocketEventBuffer(EventBufferCount, bufferPtr);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
}
var thread = new Thread(static s => ((SocketAsyncEngine)s!).EventLoop())
{
IsBackground = true,
Name = ".NET Sockets"
};
thread.UnsafeStart(this);
}
catch
{
FreeNativeResources();
throw;
}
}
private void EventLoop()
{
try
{
SocketEventHandler handler = new SocketEventHandler(this);
while (true)
{
int numEvents = EventBufferCount;
Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, handler.Buffer, &numEvents);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");
// Only enqueue a work item if the stage is NotScheduled.
// Otherwise there must be a work item already queued or another thread already handling parallelization.
if (handler.HandleSocketEvents(numEvents) &&
Interlocked.Exchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}
}
catch (Exception e)
{
Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e);
}
}
private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
{
if (!isEventQueueEmpty)
{
// There are more events to process, set stage to Scheduled and enqueue a work item.
_eventQueueProcessingStage = EventQueueProcessingStage.Scheduled;
}
else
{
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so schedule one now.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
}
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
void IThreadPoolWorkItem.Execute()
{
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
SocketIOEvent ev;
while (true)
{
Debug.Assert(_eventQueueProcessingStage == EventQueueProcessingStage.Scheduled);
// The change needs to be visible to other threads that may request a worker thread before a work item is attempted
// to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
_eventQueueProcessingStage = EventQueueProcessingStage.Determining;
Interlocked.MemoryBarrier();
if (eventQueue.TryDequeue(out ev))
{
break;
}
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
}
UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
int startTimeMs = Environment.TickCount;
do
{
ev.Context.HandleEvents(ev.Events);
// If there is a constant stream of new events, and/or if user callbacks take long to process an event, this
// work item may run for a long time. If work items of this type are using up all of the thread pool threads,
// collectively they may starve other types of work items from running. Before dequeuing and processing another
// event, check the elapsed time since the start of the work item and yield the thread after some time has
// elapsed to allow the thread pool to run other work items.
//
// The threshold chosen below was based on trying various thresholds and in trying to keep the latency of
// running another work item low when these work items are using up all of the thread pool worker threads. In
// such cases, the latency would be something like threshold / proc count. Smaller thresholds were tried and
// using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater
// impact on throughput compared to the threshold chosen below, though it is slight enough that it may not
// matter much. Higher thresholds didn't seem to have any noticeable effect.
} while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev));
}
private void FreeNativeResources()
{
if (_buffer != null)
{
Interop.Sys.FreeSocketEventBuffer(_buffer);
}
if (_port != (IntPtr)(-1))
{
Interop.Sys.CloseSocketEventPort(_port);
}
}
// The JIT is allowed to arbitrarily extend the lifetime of locals, which may retain SocketAsyncContext references,
// indirectly preventing Socket instances to be finalized, despite being no longer referenced by user code.
// To avoid this, the event handling logic is delegated to a non-inlined processing method.
// See discussion: https://github.com/dotnet/runtime/issues/37064
// SocketEventHandler holds an on-stack cache of SocketAsyncEngine members needed by the handler method.
private readonly struct SocketEventHandler
{
public Interop.Sys.SocketEvent* Buffer { get; }
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue;
public SocketEventHandler(SocketAsyncEngine engine)
{
Buffer = engine._buffer;
_eventQueue = engine._eventQueue;
}
[MethodImpl(MethodImplOptions.NoInlining)]
public bool HandleSocketEvents(int numEvents)
{
bool enqueuedEvent = false;
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(Buffer, numEvents))
{
Debug.Assert((uint)socketEvent.Data < (uint)s_registeredContexts.Length);
// The context may be null if the socket was unregistered right before the event was processed.
// The slot in s_registeredContexts may have been reused by a different context, in which case the
// incorrect socket will notice that no information is available yet and harmlessly retry, waiting for new events.
SocketAsyncContext? context = s_registeredContexts[(uint)socketEvent.Data];
if (context is not null)
{
if (context.PreferInlineCompletions)
{
context.HandleEventsInline(socketEvent.Events);
}
else
{
Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
if (events != Interop.Sys.SocketEvents.None)
{
_eventQueue.Enqueue(new SocketIOEvent(context, events));
enqueuedEvent = true;
}
}
}
}
return enqueuedEvent;
}
}
private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
public Interop.Sys.SocketEvents Events { get; }
public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events)
{
Context = context;
Events = events;
}
}
}
}
|