|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
namespace System.Net.Sockets
{
internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
{
private const int EventBufferCount =
#if DEBUG
32;
#else
1024;
#endif
// Socket continuations are dispatched to the ThreadPool from the event thread.
// This avoids continuations blocking the event handling.
// Setting PreferInlineCompletions allows continuations to run directly on the event thread.
// PreferInlineCompletions defaults to false and can be set to true using the DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS envvar.
internal static readonly bool InlineSocketCompletionsEnabled = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";
private static int GetEngineCount()
{
// The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
// and schedule corresponding work items to ThreadPool (socket reads and writes).
//
// Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load
// we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores.
//
// The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second)
// and having a single producer should be almost always enough.
//
// We want to be sure that we can handle extreme loads and that's why we have decided to use these values.
//
// It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables.
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
{
return (int)count;
}
// When inlining continuations, we default to ProcessorCount to make sure event threads cannot be a bottleneck.
if (InlineSocketCompletionsEnabled)
{
return Environment.ProcessorCount;
}
Architecture architecture = RuntimeInformation.ProcessArchitecture;
int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
? 8
: 30;
return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
}
private static readonly SocketAsyncEngine[] s_engines = CreateEngines();
private static int s_allocateFromEngine = -1;
private static SocketAsyncEngine[] CreateEngines()
{
int engineCount = GetEngineCount();
var engines = new SocketAsyncEngine[engineCount];
for (int i = 0; i < engineCount; i++)
{
engines[i] = new SocketAsyncEngine();
}
return engines;
}
private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;
//
// Maps handle values to SocketAsyncContext instances.
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();
//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();
// The scheme works as follows:
// - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
// - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
// - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
//
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
private enum EventQueueProcessingStage
{
NotScheduled,
Determining,
Scheduled
}
private EventQueueProcessingStage _eventQueueProcessingStage;
//
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
//
public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine, out Interop.Error error)
{
int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length);
SocketAsyncEngine nextEngine = s_engines[engineIndex];
bool registered = nextEngine.TryRegisterCore(socketHandle, context, out error);
engine = registered ? nextEngine : null;
return registered;
}
private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
{
bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context));
if (!added)
{
// Using public SafeSocketHandle(IntPtr) a user can add the same handle
// from a different Socket instance.
throw new InvalidOperationException(SR.net_sockets_handle_already_used);
}
error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle);
if (error == Interop.Error.SUCCESS)
{
return true;
}
_handleToContextMap.TryRemove(socketHandle, out _);
return false;
}
public void UnregisterSocket(IntPtr socketHandle, SocketAsyncContext __)
{
_handleToContextMap.TryRemove(socketHandle, out _);
}
private SocketAsyncEngine()
{
_port = (IntPtr)(-1);
try
{
//
// Create the event port and buffer
//
Interop.Error err;
fixed (IntPtr* portPtr = &_port)
{
err = Interop.Sys.CreateSocketEventPort(portPtr);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
}
fixed (Interop.Sys.SocketEvent** bufferPtr = &_buffer)
{
err = Interop.Sys.CreateSocketEventBuffer(EventBufferCount, bufferPtr);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
}
var thread = new Thread(static s => ((SocketAsyncEngine)s!).EventLoop())
{
IsBackground = true,
Name = ".NET Sockets"
};
thread.UnsafeStart(this);
}
catch
{
FreeNativeResources();
throw;
}
}
private void EventLoop()
{
try
{
SocketEventHandler handler = new SocketEventHandler(this);
while (true)
{
int numEvents = EventBufferCount;
Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, handler.Buffer, &numEvents);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
}
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");
// Only enqueue a work item if the stage is NotScheduled.
// Otherwise there must be a work item already queued or another thread already handling parallelization.
if (handler.HandleSocketEvents(numEvents) &&
Interlocked.Exchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}
}
catch (Exception e)
{
Environment.FailFast("Exception thrown from SocketAsyncEngine event loop: " + e.ToString(), e);
}
}
private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
{
if (!isEventQueueEmpty)
{
// There are more events to process, set stage to Scheduled and enqueue a work item.
_eventQueueProcessingStage = EventQueueProcessingStage.Scheduled;
}
else
{
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so schedule one now.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
}
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
void IThreadPoolWorkItem.Execute()
{
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
SocketIOEvent ev;
while (true)
{
Debug.Assert(_eventQueueProcessingStage == EventQueueProcessingStage.Scheduled);
// The change needs to be visible to other threads that may request a worker thread before a work item is attempted
// to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
_eventQueueProcessingStage = EventQueueProcessingStage.Determining;
Interlocked.MemoryBarrier();
if (eventQueue.TryDequeue(out ev))
{
break;
}
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
}
UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
int startTimeMs = Environment.TickCount;
do
{
ev.Context.HandleEvents(ev.Events);
// If there is a constant stream of new events, and/or if user callbacks take long to process an event, this
// work item may run for a long time. If work items of this type are using up all of the thread pool threads,
// collectively they may starve other types of work items from running. Before dequeuing and processing another
// event, check the elapsed time since the start of the work item and yield the thread after some time has
// elapsed to allow the thread pool to run other work items.
//
// The threshold chosen below was based on trying various thresholds and in trying to keep the latency of
// running another work item low when these work items are using up all of the thread pool worker threads. In
// such cases, the latency would be something like threshold / proc count. Smaller thresholds were tried and
// using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater
// impact on throughput compared to the threshold chosen below, though it is slight enough that it may not
// matter much. Higher thresholds didn't seem to have any noticeable effect.
} while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev));
}
private void FreeNativeResources()
{
if (_buffer != null)
{
Interop.Sys.FreeSocketEventBuffer(_buffer);
}
if (_port != (IntPtr)(-1))
{
Interop.Sys.CloseSocketEventPort(_port);
}
}
// The JIT is allowed to arbitrarily extend the lifetime of locals, which may retain SocketAsyncContext references,
// indirectly preventing Socket instances to be finalized, despite being no longer referenced by user code.
// To avoid this, the event handling logic is delegated to a non-inlined processing method.
// See discussion: https://github.com/dotnet/runtime/issues/37064
// SocketEventHandler holds an on-stack cache of SocketAsyncEngine members needed by the handler method.
private readonly struct SocketEventHandler
{
public Interop.Sys.SocketEvent* Buffer { get; }
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap;
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue;
public SocketEventHandler(SocketAsyncEngine engine)
{
Buffer = engine._buffer;
_handleToContextMap = engine._handleToContextMap;
_eventQueue = engine._eventQueue;
}
[MethodImpl(MethodImplOptions.NoInlining)]
public bool HandleSocketEvents(int numEvents)
{
bool enqueuedEvent = false;
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(Buffer, numEvents))
{
if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContextWrapper contextWrapper))
{
SocketAsyncContext context = contextWrapper.Context;
if (context.PreferInlineCompletions)
{
context.HandleEventsInline(socketEvent.Events);
}
else
{
Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
if (events != Interop.Sys.SocketEvents.None)
{
_eventQueue.Enqueue(new SocketIOEvent(context, events));
enqueuedEvent = true;
}
}
}
}
return enqueuedEvent;
}
}
// struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
// the goal is to have a dedicated generic instantiation and using:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
// instead of:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
private readonly struct SocketAsyncContextWrapper
{
public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;
internal SocketAsyncContext Context { get; }
}
private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
public Interop.Sys.SocketEvents Events { get; }
public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events)
{
Context = context;
Events = events;
}
}
}
}
|