fix: kcp2p V1.30 (#3391)

- fix: set send/recv buffer sizes directly instead of iterating to find the limit.
  fixes: https://github.com/MirrorNetworking/Mirror/issues/3390
- fix: server & client sockets are now always non-blocking to ensure main thread never
  blocks on socket.recv/send. Send() now also handles WouldBlock.
- fix: socket.Receive/From directly with non-blocking sockets and handle WouldBlock,
  instead of socket.Poll. faster, more obvious, and fixes Poll() looping forever while
  socket is in error state. fixes: https://github.com/MirrorNetworking/Mirror/issues/2733
This commit is contained in:
mischa 2023-02-23 03:14:31 +01:00 committed by GitHub
parent af787b8f06
commit 228a577683
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 141 additions and 114 deletions

View File

@ -27,16 +27,20 @@ public class KcpTransport : Transport
public uint Interval = 10; public uint Interval = 10;
[Tooltip("KCP timeout in milliseconds. Note that KCP sends a ping automatically.")] [Tooltip("KCP timeout in milliseconds. Note that KCP sends a ping automatically.")]
public int Timeout = 10000; public int Timeout = 10000;
[Tooltip("Socket receive buffer size. Large buffer helps support more connections. Increase operating system socket buffer size limits if needed.")]
public int RecvBufferSize = 1024 * 1027 * 7;
[Tooltip("Socket send buffer size. Large buffer helps support more connections. Increase operating system socket buffer size limits if needed.")]
public int SendBufferSize = 1024 * 1027 * 7;
[Header("Advanced")] [Header("Advanced")]
[Tooltip("KCP fastresend parameter. Faster resend for the cost of higher bandwidth. 0 in normal mode, 2 in turbo mode.")] [Tooltip("KCP fastresend parameter. Faster resend for the cost of higher bandwidth. 0 in normal mode, 2 in turbo mode.")]
public int FastResend = 2; public int FastResend = 2;
[Tooltip("KCP congestion window. Restricts window size to reduce congestion. Results in only 2-3 MTU messages per Flush even on loopback. Best to keept his disabled.")] [Tooltip("KCP congestion window. Restricts window size to reduce congestion. Results in only 2-3 MTU messages per Flush even on loopback. Best to keept his disabled.")]
/*public*/ bool CongestionWindow = false; // KCP 'NoCongestionWindow' is false by default. here we negate it for ease of use. /*public*/ bool CongestionWindow = false; // KCP 'NoCongestionWindow' is false by default. here we negate it for ease of use.
[Tooltip("KCP window size can be modified to support higher loads.")]
public uint SendWindowSize = 4096; //Kcp.WND_SND; 32 by default. Mirror sends a lot, so we need a lot more.
[Tooltip("KCP window size can be modified to support higher loads. This also increases max message size.")] [Tooltip("KCP window size can be modified to support higher loads. This also increases max message size.")]
public uint ReceiveWindowSize = 4096; //Kcp.WND_RCV; 128 by default. Mirror sends a lot, so we need a lot more. public uint ReceiveWindowSize = 4096; //Kcp.WND_RCV; 128 by default. Mirror sends a lot, so we need a lot more.
[Tooltip("KCP window size can be modified to support higher loads.")]
public uint SendWindowSize = 4096; //Kcp.WND_SND; 32 by default. Mirror sends a lot, so we need a lot more.
[Tooltip("KCP will try to retransmit lost messages up to MaxRetransmit (aka dead_link) before disconnecting.")] [Tooltip("KCP will try to retransmit lost messages up to MaxRetransmit (aka dead_link) before disconnecting.")]
public uint MaxRetransmit = Kcp.DEADLINK * 2; // default prematurely disconnects a lot of people (#3022). use 2x. public uint MaxRetransmit = Kcp.DEADLINK * 2; // default prematurely disconnects a lot of people (#3022). use 2x.
[Tooltip("Enable to automatically set client & server send/recv buffers to OS limit. Avoids issues with too small buffers under heavy load, potentially dropping connections. Increase the OS limit if this is still too small.")] [Tooltip("Enable to automatically set client & server send/recv buffers to OS limit. Avoids issues with too small buffers under heavy load, potentially dropping connections. Increase the OS limit if this is still too small.")]
@ -101,7 +105,7 @@ void Awake()
Log.Error = Debug.LogError; Log.Error = Debug.LogError;
// create config from serialized settings // create config from serialized settings
config = new KcpConfig(DualMode, MaximizeSocketBuffers, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmit); config = new KcpConfig(DualMode, RecvBufferSize, SendBufferSize, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmit);
// client (NonAlloc version is not necessary anymore) // client (NonAlloc version is not necessary anymore)
client = new KcpClient( client = new KcpClient(

View File

@ -1,3 +1,12 @@
V1.30 [2023-02-20]
- fix: set send/recv buffer sizes directly instead of iterating to find the limit.
fixes: https://github.com/MirrorNetworking/Mirror/issues/3390
- fix: server & client sockets are now always non-blocking to ensure main thread never
blocks on socket.recv/send. Send() now also handles WouldBlock.
- fix: socket.Receive/From directly with non-blocking sockets and handle WouldBlock,
instead of socket.Poll. faster, more obvious, and fixes Poll() looping forever while
socket is in error state. fixes: https://github.com/MirrorNetworking/Mirror/issues/2733
V1.29 [2023-01-28] V1.29 [2023-01-28]
- fix: KcpServer.CreateServerSocket now handles NotSupportedException when setting DualMode - fix: KcpServer.CreateServerSocket now handles NotSupportedException when setting DualMode
https://github.com/MirrorNetworking/Mirror/issues/3358 https://github.com/MirrorNetworking/Mirror/issues/3358

View File

@ -24,17 +24,26 @@ public static bool ResolveHostname(string hostname, out IPAddress[] addresses)
// if connections drop under heavy load, increase to OS limit. // if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit. // if still not enough, increase the OS limit.
public static void MaximizeSocketBuffers(Socket socket) public static void ConfigureSocketBuffers(Socket socket, int recvBufferSize, int sendBufferSize)
{ {
// log initial size for comparison. // log initial size for comparison.
// remember initial size for log comparison // remember initial size for log comparison
int initialReceive = socket.ReceiveBufferSize; int initialReceive = socket.ReceiveBufferSize;
int initialSend = socket.SendBufferSize; int initialSend = socket.SendBufferSize;
socket.SetReceiveBufferToOSLimit(); // set to configured size
socket.SetSendBufferToOSLimit(); try
{
socket.ReceiveBufferSize = recvBufferSize;
socket.SendBufferSize = sendBufferSize;
}
catch (SocketException)
{
Log.Warning($"Kcp: failed to set Socket RecvBufSize = {recvBufferSize} SendBufSize = {sendBufferSize}");
}
Log.Info($"Kcp: RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x) maximized to OS limits!");
Log.Info($"Kcp: RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x)");
} }
} }
} }

View File

@ -1,33 +1,6 @@
using System.Net.Sockets;
namespace kcp2k namespace kcp2k
{ {
public static class Extensions public static class Extensions
{ {
// 100k attempts of 1 KB increases = default + 100 MB max
public static void SetReceiveBufferToOSLimit(this Socket socket, int stepSize = 1024, int attempts = 100_000)
{
// setting a too large size throws a socket exception.
// so let's keep increasing until we encounter it.
for (int i = 0; i < attempts; ++i)
{
// increase in 1 KB steps
try { socket.ReceiveBufferSize += stepSize; }
catch (SocketException) { break; }
}
}
// 100k attempts of 1 KB increases = default + 100 MB max
public static void SetSendBufferToOSLimit(this Socket socket, int stepSize = 1024, int attempts = 100_000)
{
// setting a too large size throws a socket exception.
// so let's keep increasing until we encounter it.
for (int i = 0; i < attempts; ++i)
{
// increase in 1 KB steps
try { socket.SendBufferSize += stepSize; }
catch (SocketException) { break; }
}
}
} }
} }

View File

@ -97,15 +97,13 @@ void OnDisconnectedWrap()
remoteEndPoint = new IPEndPoint(addresses[0], port); remoteEndPoint = new IPEndPoint(addresses[0], port);
socket = new Socket(remoteEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); socket = new Socket(remoteEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
// configure buffer sizes: // recv & send are called from main thread.
// if connections drop under heavy load, increase to OS limit. // need to ensure this never blocks.
// if still not enough, increase the OS limit. // even a 1ms block per connection would stop us from scaling.
if (config.MaximizeSocketBuffers) socket.Blocking = false;
{
Common.MaximizeSocketBuffers(socket); // configure buffer sizes
} Common.ConfigureSocketBuffers(socket, config.RecvBufferSize, config.SendBufferSize);
// otherwise still log the defaults for info.
else Log.Info($"KcpClient: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(KcpConfig.MaximizeSocketBuffers)} to increase it to OS limit. If they still drop, increase the OS limit.");
// bind to endpoint so we can use send/recv instead of sendto/recvfrom. // bind to endpoint so we can use send/recv instead of sendto/recvfrom.
socket.Connect(remoteEndPoint); socket.Connect(remoteEndPoint);
@ -121,42 +119,56 @@ void OnDisconnectedWrap()
protected virtual bool RawReceive(out ArraySegment<byte> segment) protected virtual bool RawReceive(out ArraySegment<byte> segment)
{ {
segment = default; segment = default;
if (socket == null) return false;
try try
{ {
if (socket != null && socket.Poll(0, SelectMode.SelectRead)) // ReceiveFrom allocates. we used bound Receive.
{ // returns amount of bytes written into buffer.
// ReceiveFrom allocates. we used bound Receive. // throws SocketException if datagram was larger than buffer.
// returns amount of bytes written into buffer. // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
// throws SocketException if datagram was larger than buffer. int msgLength = socket.Receive(rawReceiveBuffer);
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
int msgLength = socket.Receive(rawReceiveBuffer);
//Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}"); //Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
segment = new ArraySegment<byte>(rawReceiveBuffer, 0, msgLength); segment = new ArraySegment<byte>(rawReceiveBuffer, 0, msgLength);
return true; return true;
}
} }
// this is fine, the socket might have been closed in the other end // for non-blocking sockets, Receive throws WouldBlock if there is
catch (SocketException ex) // no message to read. that's okay. only log for other errors.
catch (SocketException e)
{ {
// the other end closing the connection is not an 'error'. if (e.SocketErrorCode != SocketError.WouldBlock)
// but connections should never just end silently. {
// at least log a message for easier debugging. // the other end closing the connection is not an 'error'.
// for example, his can happen when connecting without a server. // but connections should never just end silently.
// see test: ConnectWithoutServer(). // at least log a message for easier debugging.
Log.Info($"KcpClient: looks like the other end has closed the connection. This is fine: {ex}"); // for example, his can happen when connecting without a server.
peer.Disconnect(); // see test: ConnectWithoutServer().
Log.Info($"KcpClient: looks like the other end has closed the connection. This is fine: {e}");
peer.Disconnect();
}
// WouldBlock indicates there's no data yet, so return false.
return false;
} }
return false;
} }
// io - output. // io - output.
// virtual so it may be modified for relays, etc. // virtual so it may be modified for relays, etc.
protected virtual void RawSend(ArraySegment<byte> data) protected virtual void RawSend(ArraySegment<byte> data)
{ {
socket.Send(data.Array, data.Offset, data.Count, SocketFlags.None); try
{
socket.Send(data.Array, data.Offset, data.Count, SocketFlags.None);
}
// for non-blocking sockets, SendTo may throw WouldBlock.
// in that case, simply drop the message. it's UDP, it's fine.
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
{
Log.Error($"KcpClient: Send failed: {e}");
}
}
} }
public void Send(ArraySegment<byte> segment, KcpChannel channel) public void Send(ArraySegment<byte> segment, KcpChannel channel)

View File

@ -13,10 +13,15 @@ public class KcpConfig
// (Nintendo Switch, etc.) // (Nintendo Switch, etc.)
public bool DualMode; public bool DualMode;
// attempt to maximize socket send/recv buffers to OS limit. // UDP servers use only one socket.
// too small send/receive buffers might cause connection drops under // maximize buffer to handle as many connections as possible.
// heavy load. using the OS max size can make a difference already. //
public bool MaximizeSocketBuffers; // M1 mac pro:
// recv buffer default: 786896 (771 KB)
// send buffer default: 9216 (9 KB)
// max configurable: ~7 MB
public int RecvBufferSize;
public int SendBufferSize;
// kcp configuration /////////////////////////////////////////////////// // kcp configuration ///////////////////////////////////////////////////
// NoDelay is recommended to reduce latency. This also scales better // NoDelay is recommended to reduce latency. This also scales better
@ -59,7 +64,8 @@ public class KcpConfig
// makes it easy to define "new KcpConfig(DualMode=false)" etc. // makes it easy to define "new KcpConfig(DualMode=false)" etc.
public KcpConfig( public KcpConfig(
bool DualMode = true, bool DualMode = true,
bool MaximizeSocketBuffers = false, int RecvBufferSize = 1024 * 1024 * 7,
int SendBufferSize = 1024 * 1024 * 7,
bool NoDelay = true, bool NoDelay = true,
uint Interval = 10, uint Interval = 10,
int FastResend = 0, int FastResend = 0,
@ -70,7 +76,8 @@ public KcpConfig(
uint MaxRetransmits = Kcp.DEADLINK) uint MaxRetransmits = Kcp.DEADLINK)
{ {
this.DualMode = DualMode; this.DualMode = DualMode;
this.MaximizeSocketBuffers = MaximizeSocketBuffers; this.RecvBufferSize = RecvBufferSize;
this.SendBufferSize = SendBufferSize;
this.NoDelay = NoDelay; this.NoDelay = NoDelay;
this.Interval = Interval; this.Interval = Interval;
this.FastResend = FastResend; this.FastResend = FastResend;

View File

@ -103,15 +103,13 @@ public virtual void Start(ushort port)
// listen // listen
socket = CreateServerSocket(config.DualMode, port); socket = CreateServerSocket(config.DualMode, port);
// configure buffer sizes: // recv & send are called from main thread.
// if connections drop under heavy load, increase to OS limit. // need to ensure this never blocks.
// if still not enough, increase the OS limit. // even a 1ms block per connection would stop us from scaling.
if (config.MaximizeSocketBuffers) socket.Blocking = false;
{
Common.MaximizeSocketBuffers(socket); // configure buffer sizes
} Common.ConfigureSocketBuffers(socket, config.RecvBufferSize, config.SendBufferSize);
// otherwise still log the defaults for info.
else Log.Info($"KcpServer: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(KcpConfig.MaximizeSocketBuffers)} to increase it to OS limit. If they still drop, increase the OS limit.");
} }
public void Send(int connectionId, ArraySegment<byte> segment, KcpChannel channel) public void Send(int connectionId, ArraySegment<byte> segment, KcpChannel channel)
@ -149,45 +147,48 @@ protected virtual bool RawReceiveFrom(out ArraySegment<byte> segment, out int co
{ {
segment = default; segment = default;
connectionId = 0; connectionId = 0;
if (socket == null) return false;
try try
{ {
if (socket != null && socket.Poll(0, SelectMode.SelectRead)) // NOTE: ReceiveFrom allocates.
{ // we pass our IPEndPoint to ReceiveFrom.
// NOTE: ReceiveFrom allocates. // receive from calls newClientEP.Create(socketAddr).
// we pass our IPEndPoint to ReceiveFrom. // IPEndPoint.Create always returns a new IPEndPoint.
// receive from calls newClientEP.Create(socketAddr). // https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761
// IPEndPoint.Create always returns a new IPEndPoint. //
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761 // throws SocketException if datagram was larger than buffer.
// // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
// throws SocketException if datagram was larger than buffer. int size = socket.ReceiveFrom(rawReceiveBuffer, 0, rawReceiveBuffer.Length, SocketFlags.None, ref newClientEP);
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 segment = new ArraySegment<byte>(rawReceiveBuffer, 0, size);
int size = socket.ReceiveFrom(rawReceiveBuffer, 0, rawReceiveBuffer.Length, SocketFlags.None, ref newClientEP);
segment = new ArraySegment<byte>(rawReceiveBuffer, 0, size);
// set connectionId to hash from endpoint // set connectionId to hash from endpoint
// NOTE: IPEndPoint.GetHashCode() allocates. // NOTE: IPEndPoint.GetHashCode() allocates.
// it calls m_Address.GetHashCode(). // it calls m_Address.GetHashCode().
// m_Address is an IPAddress. // m_Address is an IPAddress.
// GetHashCode() allocates for IPv6: // GetHashCode() allocates for IPv6:
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/IPAddress.cs#L699 // https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/IPAddress.cs#L699
// //
// => using only newClientEP.Port wouldn't work, because // => using only newClientEP.Port wouldn't work, because
// different connections can have the same port. // different connections can have the same port.
connectionId = newClientEP.GetHashCode(); connectionId = newClientEP.GetHashCode();
return true; return true;
}
} }
// this is fine, the socket might have been closed in the other end // for non-blocking sockets, Receive throws WouldBlock if there is
catch (SocketException ex) // no message to read. that's okay. only log for other errors.
catch (SocketException e)
{ {
// the other end closing the connection is not an 'error'. if (e.SocketErrorCode != SocketError.WouldBlock)
// but connections should never just end silently. {
// at least log a message for easier debugging. // NOTE: SocketException is not a subclass of IOException.
Log.Info($"KcpServer: poll & read failed: {ex}"); // the other end closing the connection is not an 'error'.
// but connections should never just end silently.
// at least log a message for easier debugging.
Log.Info($"KcpServer: ReceiveFrom failed: {e}");
}
// WouldBlock indicates there's no data yet, so return false.
return false;
} }
return false;
} }
// io - out. // io - out.
@ -205,7 +206,19 @@ protected virtual void RawSend(int connectionId, ArraySegment<byte> data)
// send to the the endpoint. // send to the the endpoint.
// do not send to 'newClientEP', as that's always reused. // do not send to 'newClientEP', as that's always reused.
// fixes https://github.com/MirrorNetworking/Mirror/issues/3296 // fixes https://github.com/MirrorNetworking/Mirror/issues/3296
socket.SendTo(data.Array, data.Offset, data.Count, SocketFlags.None, connection.remoteEndPoint); try
{
socket.SendTo(data.Array, data.Offset, data.Count, SocketFlags.None, connection.remoteEndPoint);
}
// for non-blocking sockets, SendTo may throw WouldBlock.
// in that case, simply drop the message. it's UDP, it's fine.
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
{
Log.Error($"KcpServer: SendTo failed: {e}");
}
}
} }
protected virtual KcpServerConnection CreateConnection(int connectionId) protected virtual KcpServerConnection CreateConnection(int connectionId)