kcp2k V1.21 [2022-11-24]

- high level refactor, part one.
  - KcpPeer instead of KcpConnection, KcpClientConnection, KcpServerConnection
  - RawSend/Receive can now easily be overwritten in KcpClient/Server.
    for non-alloc, relays, etc.
This commit is contained in:
vis2k 2022-11-24 10:49:52 +01:00
parent e1a5f0300f
commit d100ecb4c4
17 changed files with 400 additions and 465 deletions

View File

@ -150,8 +150,8 @@ void Awake()
void OnValidate()
{
// show max message sizes in inspector for convenience
ReliableMaxMessageSize = KcpConnection.ReliableMaxMessageSize(ReceiveWindowSize);
UnreliableMaxMessageSize = KcpConnection.UnreliableMaxMessageSize;
ReliableMaxMessageSize = KcpPeer.ReliableMaxMessageSize(ReceiveWindowSize);
UnreliableMaxMessageSize = KcpPeer.UnreliableMaxMessageSize;
}
// all except WebGL
@ -238,9 +238,9 @@ public override int GetMaxPacketSize(int channelId = Channels.Reliable)
switch (channelId)
{
case Channels.Unreliable:
return KcpConnection.UnreliableMaxMessageSize;
return KcpPeer.UnreliableMaxMessageSize;
default:
return KcpConnection.ReliableMaxMessageSize(ReceiveWindowSize);
return KcpPeer.ReliableMaxMessageSize(ReceiveWindowSize);
}
}
@ -253,27 +253,27 @@ public override int GetMaxPacketSize(int channelId = Channels.Reliable)
// => instead we always use MTU sized batches.
// => people can still send maxed size if needed.
public override int GetBatchThreshold(int channelId) =>
KcpConnection.UnreliableMaxMessageSize;
KcpPeer.UnreliableMaxMessageSize;
// server statistics
// LONG to avoid int overflows with connections.Sum.
// see also: https://github.com/vis2k/Mirror/pull/2777
public long GetAverageMaxSendRate() =>
server.connections.Count > 0
? server.connections.Values.Sum(conn => (long)conn.MaxSendRate) / server.connections.Count
? server.connections.Values.Sum(conn => (long)conn.peer.MaxSendRate) / server.connections.Count
: 0;
public long GetAverageMaxReceiveRate() =>
server.connections.Count > 0
? server.connections.Values.Sum(conn => (long)conn.MaxReceiveRate) / server.connections.Count
? server.connections.Values.Sum(conn => (long)conn.peer.MaxReceiveRate) / server.connections.Count
: 0;
long GetTotalSendQueue() =>
server.connections.Values.Sum(conn => conn.SendQueueCount);
server.connections.Values.Sum(conn => conn.peer.SendQueueCount);
long GetTotalReceiveQueue() =>
server.connections.Values.Sum(conn => conn.ReceiveQueueCount);
server.connections.Values.Sum(conn => conn.peer.ReceiveQueueCount);
long GetTotalSendBuffer() =>
server.connections.Values.Sum(conn => conn.SendBufferCount);
server.connections.Values.Sum(conn => conn.peer.SendBufferCount);
long GetTotalReceiveBuffer() =>
server.connections.Values.Sum(conn => conn.ReceiveBufferCount);
server.connections.Values.Sum(conn => conn.peer.ReceiveBufferCount);
// PrettyBytes function from DOTSNET
// pretty prints bytes as KB/MB/GB/etc.
@ -320,12 +320,12 @@ void OnGUI()
{
GUILayout.BeginVertical("Box");
GUILayout.Label("CLIENT");
GUILayout.Label($" MaxSendRate: {PrettyBytes(client.connection.MaxSendRate)}/s");
GUILayout.Label($" MaxRecvRate: {PrettyBytes(client.connection.MaxReceiveRate)}/s");
GUILayout.Label($" SendQueue: {client.connection.SendQueueCount}");
GUILayout.Label($" ReceiveQueue: {client.connection.ReceiveQueueCount}");
GUILayout.Label($" SendBuffer: {client.connection.SendBufferCount}");
GUILayout.Label($" ReceiveBuffer: {client.connection.ReceiveBufferCount}");
GUILayout.Label($" MaxSendRate: {PrettyBytes(client.peer.MaxSendRate)}/s");
GUILayout.Label($" MaxRecvRate: {PrettyBytes(client.peer.MaxReceiveRate)}/s");
GUILayout.Label($" SendQueue: {client.peer.SendQueueCount}");
GUILayout.Label($" ReceiveQueue: {client.peer.ReceiveQueueCount}");
GUILayout.Label($" SendBuffer: {client.peer.SendBufferCount}");
GUILayout.Label($" ReceiveBuffer: {client.peer.ReceiveBufferCount}");
GUILayout.EndVertical();
}
@ -351,12 +351,12 @@ void OnLogStatistics()
if (ClientConnected())
{
string log = "kcp CLIENT @ time: " + NetworkTime.localTime + "\n";
log += $" MaxSendRate: {PrettyBytes(client.connection.MaxSendRate)}/s\n";
log += $" MaxRecvRate: {PrettyBytes(client.connection.MaxReceiveRate)}/s\n";
log += $" SendQueue: {client.connection.SendQueueCount}\n";
log += $" ReceiveQueue: {client.connection.ReceiveQueueCount}\n";
log += $" SendBuffer: {client.connection.SendBufferCount}\n";
log += $" ReceiveBuffer: {client.connection.ReceiveBufferCount}\n\n";
log += $" MaxSendRate: {PrettyBytes(client.peer.MaxSendRate)}/s\n";
log += $" MaxRecvRate: {PrettyBytes(client.peer.MaxReceiveRate)}/s\n";
log += $" SendQueue: {client.peer.SendQueueCount}\n";
log += $" ReceiveQueue: {client.peer.ReceiveQueueCount}\n";
log += $" SendBuffer: {client.peer.SendBufferCount}\n";
log += $" ReceiveBuffer: {client.peer.ReceiveBufferCount}\n\n";
Debug.Log(log);
}
}

View File

@ -1,3 +1,9 @@
V1.21 [2022-11-24]
- high level refactor, part one.
- KcpPeer instead of KcpConnection, KcpClientConnection, KcpServerConnection
- RawSend/Receive can now easily be overwritten in KcpClient/Server.
for non-alloc, relays, etc.
V1.20 [2022-11-22]
- perf: KcpClient receive allocation was removed entirely.
reduces Mirror benchmark client sided allocations from 4.9 KB / 1.7 KB (non-alloc) to 0B.

View File

@ -21,5 +21,20 @@ public static bool ResolveHostname(string hostname, out IPAddress[] addresses)
return false;
}
}
// if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit.
public static void MaximizeSocketBuffers(Socket socket)
{
// log initial size for comparison.
// remember initial size for log comparison
int initialReceive = socket.ReceiveBufferSize;
int initialSend = socket.SendBufferSize;
socket.SetReceiveBufferToOSLimit();
socket.SetSendBufferToOSLimit();
Log.Info($"Kcp: RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x) maximized to OS limits!");
}
}
}

View File

@ -1,11 +1,27 @@
// kcp client logic abstracted into a class.
// for use in Mirror, DOTSNET, testing, etc.
using System;
using System.Net;
using System.Net.Sockets;
namespace kcp2k
{
public class KcpClient
{
// kcp
// public so that bandwidth statistics can be accessed from the outside
public KcpPeer peer;
// IO
protected Socket socket;
public EndPoint remoteEndPoint;
// raw receive buffer always needs to be of 'MTU' size, even if
// MaxMessageSize is larger. kcp always sends in MTU segments and having
// a buffer smaller than MTU would silently drop excess data.
// => we need the MTU to fit channel + message!
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
// events
public Action OnConnected;
public Action<ArraySegment<byte>, KcpChannel> OnData;
@ -16,7 +32,6 @@ public class KcpClient
public Action<ErrorCode, string> OnError;
// state
public KcpClientConnection connection;
public bool connected;
public KcpClient(Action OnConnected,
@ -31,11 +46,6 @@ public KcpClient(Action OnConnected,
this.OnError = OnError;
}
// CreateConnection can be overwritten for where-allocation:
// https://github.com/vis2k/where-allocation
protected virtual KcpClientConnection CreateConnection() =>
new KcpClientConnection();
public void Connect(string address,
ushort port,
bool noDelay,
@ -44,7 +54,7 @@ public void Connect(string address,
bool congestionWindow = true,
uint sendWindowSize = Kcp.WND_SND,
uint receiveWindowSize = Kcp.WND_RCV,
int timeout = KcpConnection.DEFAULT_TIMEOUT,
int timeout = KcpPeer.DEFAULT_TIMEOUT,
uint maxRetransmits = Kcp.DEADLINK,
bool maximizeSendReceiveBuffersToOSLimit = false)
{
@ -54,54 +64,117 @@ public void Connect(string address,
return;
}
// create connection
connection = CreateConnection();
// create fresh peer for each new session
peer = new KcpPeer(RawSend, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout, maxRetransmits);
// setup events
connection.OnAuthenticated = () =>
peer.OnAuthenticated = () =>
{
Log.Info($"KCP: OnClientConnected");
connected = true;
OnConnected();
};
connection.OnData = (message, channel) =>
peer.OnData = (message, channel) =>
{
//Log.Debug($"KCP: OnClientData({BitConverter.ToString(message.Array, message.Offset, message.Count)})");
OnData(message, channel);
};
connection.OnDisconnected = () =>
peer.OnDisconnected = () =>
{
Log.Info($"KCP: OnClientDisconnected");
connected = false;
connection = null;
peer = null;
socket?.Close();
socket = null;
remoteEndPoint = null;
OnDisconnected();
};
connection.OnError = (error, reason) =>
peer.OnError = (error, reason) =>
{
OnError(error, reason);
};
// connect
connection.Connect(address,
port,
noDelay,
interval,
fastResend,
congestionWindow,
sendWindowSize,
receiveWindowSize,
timeout,
maxRetransmits,
maximizeSendReceiveBuffersToOSLimit);
Log.Info($"KcpClient: connect to {address}:{port}");
// try resolve host name
if (!Common.ResolveHostname(address, out IPAddress[] addresses))
{
// pass error to user callback. no need to log it manually.
peer.OnError(ErrorCode.DnsResolve, $"Failed to resolve host: {address}");
peer.OnDisconnected();
return;
}
// create socket
remoteEndPoint = new IPEndPoint(addresses[0], port);
socket = new Socket(remoteEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
// configure buffer sizes:
// if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit.
if (maximizeSendReceiveBuffersToOSLimit)
{
Common.MaximizeSocketBuffers(socket);
}
// otherwise still log the defaults for info.
else Log.Info($"KcpClient: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(maximizeSendReceiveBuffersToOSLimit)} to increase it to OS limit. If they still drop, increase the OS limit.");
// bind to endpoint so we can use send/recv instead of sendto/recvfrom.
socket.Connect(remoteEndPoint);
// client should send handshake to server as very first message
peer.SendHandshake();
RawReceive();
}
// io - input.
// virtual so it may be modified for relays, etc.
protected virtual void RawReceive()
{
if (socket == null) return;
try
{
while (socket.Poll(0, SelectMode.SelectRead))
{
// ReceiveFrom allocates. we used bound Receive.
// returns amount of bytes written into buffer.
// throws SocketException if datagram was larger than buffer.
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
int msgLength = socket.Receive(rawReceiveBuffer);
//Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
peer.RawInput(rawReceiveBuffer, msgLength);
}
}
// this is fine, the socket might have been closed in the other end
catch (SocketException ex)
{
// the other end closing the connection is not an 'error'.
// but connections should never just end silently.
// at least log a message for easier debugging.
Log.Info($"KCP ClientConnection: looks like the other end has closed the connection. This is fine: {ex}");
peer.Disconnect();
}
}
// io - output.
// virtual so it may be modified for relays, etc.
protected virtual void RawSend(ArraySegment<byte> data)
{
socket.Send(data.Array, data.Offset, data.Count, SocketFlags.None);
}
public void Send(ArraySegment<byte> segment, KcpChannel channel)
{
if (connected)
if (!connected)
{
connection.SendData(segment, channel);
Log.Warning("KCP: can't send because client not connected!");
return;
}
else Log.Warning("KCP: can't send because client not connected!");
peer.SendData(segment, channel);
}
public void Disconnect()
@ -109,13 +182,12 @@ public void Disconnect()
// only if connected
// otherwise we end up in a deadlock because of an open Mirror bug:
// https://github.com/vis2k/Mirror/issues/2353
if (connected)
{
// call Disconnect and let the connection handle it.
// DO NOT set it to null yet. it needs to be updated a few more
// times first. let the connection handle it!
connection?.Disconnect();
}
if (!connected) return;
// call Disconnect and let the connection handle it.
// DO NOT set it to null yet. it needs to be updated a few more
// times first. let the connection handle it!
peer?.Disconnect();
}
// process incoming messages. should be called before updating the world.
@ -124,8 +196,11 @@ public void TickIncoming()
// recv on socket first, then process incoming
// (even if we didn't receive anything. need to tick ping etc.)
// (connection is null if not active)
connection?.RawReceive();
connection?.TickIncoming();
if (peer != null)
{
RawReceive();
peer.TickIncoming();
}
}
// process outgoing messages. should be called after updating the world.
@ -133,7 +208,7 @@ public void TickOutgoing()
{
// process outgoing
// (connection is null if not active)
connection?.TickOutgoing();
peer?.TickOutgoing();
}
// process incoming and outgoing for convenience

View File

@ -1,142 +0,0 @@
using System.Net;
using System.Net.Sockets;
namespace kcp2k
{
public class KcpClientConnection : KcpConnection
{
// IMPORTANT: raw receive buffer always needs to be of 'MTU' size, even
// if MaxMessageSize is larger. kcp always sends in MTU
// segments and having a buffer smaller than MTU would
// silently drop excess data.
// => we need the MTU to fit channel + message!
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
// EndPoint & Receive functions can be overwritten for where-allocation:
// https://github.com/vis2k/where-allocation
// NOTE: Client's SendTo doesn't allocate, don't need a virtual.
protected virtual void CreateRemoteEndPoint(IPAddress[] addresses, ushort port) =>
remoteEndPoint = new IPEndPoint(addresses[0], port);
// if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit.
void ConfigureSocketBufferSizes(bool maximizeSendReceiveBuffersToOSLimit)
{
if (maximizeSendReceiveBuffersToOSLimit)
{
// log initial size for comparison.
// remember initial size for log comparison
int initialReceive = socket.ReceiveBufferSize;
int initialSend = socket.SendBufferSize;
socket.SetReceiveBufferToOSLimit();
socket.SetSendBufferToOSLimit();
Log.Info($"KcpClient: RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x) increased to OS limits!");
}
// otherwise still log the defaults for info.
else Log.Info($"KcpClient: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(maximizeSendReceiveBuffersToOSLimit)} to increase it to OS limit. If they still drop, increase the OS limit.");
}
public void Connect(string host,
ushort port,
bool noDelay,
uint interval = Kcp.INTERVAL,
int fastResend = 0,
bool congestionWindow = true,
uint sendWindowSize = Kcp.WND_SND,
uint receiveWindowSize = Kcp.WND_RCV,
int timeout = DEFAULT_TIMEOUT,
uint maxRetransmits = Kcp.DEADLINK,
bool maximizeSendReceiveBuffersToOSLimit = false)
{
Log.Info($"KcpClient: connect to {host}:{port}");
// try resolve host name
if (Common.ResolveHostname(host, out IPAddress[] addresses))
{
// create remote endpoint
CreateRemoteEndPoint(addresses, port);
// create socket
socket = new Socket(remoteEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
// configure buffer sizes
ConfigureSocketBufferSizes(maximizeSendReceiveBuffersToOSLimit);
// connect
socket.Connect(remoteEndPoint);
// set up kcp
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout, maxRetransmits);
// client should send handshake to server as very first message
SendHandshake();
RawReceive();
}
// otherwise call OnDisconnected to let the user know.
else
{
// pass error to user callback. no need to log it manually.
OnError(ErrorCode.DnsResolve, $"Failed to resolve host: {host}");
OnDisconnected();
}
}
// call from transport update
public void RawReceive()
{
try
{
if (socket != null)
{
while (socket.Poll(0, SelectMode.SelectRead))
{
// ReceiveFrom allocates.
// use Connect() to bind the UDP socket to the end point.
// then we can use Receive() instead.
// socket.ReceiveFrom(buffer, ref remoteEndPoint);
int msgLength = socket.Receive(rawReceiveBuffer);
// IMPORTANT: detect if buffer was too small for the
// received msgLength. otherwise the excess
// data would be silently lost.
// (see ReceiveFrom documentation)
if (msgLength <= rawReceiveBuffer.Length)
{
//Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
RawInput(rawReceiveBuffer, msgLength);
}
else
{
// pass error to user callback. no need to log it manually.
OnError(ErrorCode.InvalidReceive, $"KCP ClientConnection: message of size {msgLength} does not fit into buffer of size {rawReceiveBuffer.Length}. The excess was silently dropped. Disconnecting.");
Disconnect();
}
}
}
}
// this is fine, the socket might have been closed in the other end
catch (SocketException ex)
{
// the other end closing the connection is not an 'error'.
// but connections should never just end silently.
// at least log a message for easier debugging.
Log.Info($"KCP ClientConnection: looks like the other end has closed the connection. This is fine: {ex}");
Disconnect();
}
}
protected override void Dispose()
{
socket.Close();
socket = null;
}
protected override void RawSend(byte[] data, int length)
{
socket.Send(data, length, SocketFlags.None);
}
}
}

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 96512e74aa8214a6faa8a412a7a07877
timeCreated: 1602601237

View File

@ -1,20 +1,26 @@
// Kcp Peer, similar to UDP Peer but wrapped with reliability, channels,
// timeouts, authentication, state, etc.
//
// still IO agnostic to work with udp, nonalloc, relays, native, etc.
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
namespace kcp2k
{
enum KcpState { Connected, Authenticated, Disconnected }
public abstract class KcpConnection
public class KcpPeer
{
protected Socket socket;
protected EndPoint remoteEndPoint;
// kcp reliability algorithm
internal Kcp kcp;
// kcp can have several different states, let's use a state machine
KcpState state = KcpState.Disconnected;
// IO agnostic
readonly Action<ArraySegment<byte>> RawSend;
// state: connected as soon as we create the peer.
// leftover from KcpConnection. remove it after refactoring later.
KcpState state = KcpState.Connected;
public Action OnAuthenticated;
public Action<ArraySegment<byte>, KcpChannel> OnData;
@ -27,7 +33,7 @@ public abstract class KcpConnection
// If we don't receive anything these many milliseconds
// then consider us disconnected
public const int DEFAULT_TIMEOUT = 10000;
public int timeout = DEFAULT_TIMEOUT;
public int timeout;
uint lastReceiveTime;
// internal time.
@ -79,16 +85,16 @@ public static int ReliableMaxMessageSize(uint rcv_wnd) =>
// buffer to receive kcp's processed messages (avoids allocations).
// IMPORTANT: this is for KCP messages. so it needs to be of size:
// 1 byte header + MaxMessageSize content
byte[] kcpMessageBuffer;// = new byte[1 + ReliableMaxMessageSize];
readonly byte[] kcpMessageBuffer;// = new byte[1 + ReliableMaxMessageSize];
// send buffer for handing user messages to kcp for processing.
// (avoids allocations).
// IMPORTANT: needs to be of size:
// 1 byte header + MaxMessageSize content
byte[] kcpSendBuffer;// = new byte[1 + ReliableMaxMessageSize];
readonly byte[] kcpSendBuffer;// = new byte[1 + ReliableMaxMessageSize];
// raw send buffer is exactly MTU.
byte[] rawSendBuffer = new byte[Kcp.MTU_DEF];
readonly byte[] rawSendBuffer = new byte[Kcp.MTU_DEF];
// send a ping occasionally so we don't time out on the other end.
// for example, creating a character in an MMO could easily take a
@ -133,10 +139,22 @@ public static int ReliableMaxMessageSize(uint rcv_wnd) =>
// => useful to start from a fresh state every time the client connects
// => NoDelay, interval, wnd size are the most important configurations.
// let's force require the parameters so we don't forget it anywhere.
protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT, uint maxRetransmits = Kcp.DEADLINK)
public KcpPeer(
Action<ArraySegment<byte>> output,
bool noDelay,
uint interval = Kcp.INTERVAL,
int fastResend = 0,
bool congestionWindow = true,
uint sendWindowSize = Kcp.WND_SND,
uint receiveWindowSize = Kcp.WND_RCV,
int timeout = DEFAULT_TIMEOUT,
uint maxRetransmits = Kcp.DEADLINK)
{
this.RawSend = output;
// set up kcp over reliable channel (that's what kcp is for)
kcp = new Kcp(0, RawSendReliable);
// set nodelay.
// note that kcp uses 'nocwnd' internally so we negate the parameter
kcp.SetNoDelay(noDelay ? 1u : 0u, interval, fastResend, !congestionWindow);
@ -154,10 +172,9 @@ protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastRese
// create message buffers AFTER window size is set
// see comments on buffer definition for the "+1" part
kcpMessageBuffer = new byte[1 + ReliableMaxMessageSize(receiveWindowSize)];
kcpSendBuffer = new byte[1 + ReliableMaxMessageSize(receiveWindowSize)];
kcpSendBuffer = new byte[1 + ReliableMaxMessageSize(receiveWindowSize)];
this.timeout = timeout;
state = KcpState.Connected;
refTime.Start();
}
@ -397,6 +414,7 @@ public void TickIncoming()
}
}
}
// TODO KcpConnection is IO agnostic. move this to outside later.
catch (SocketException exception)
{
// this is ok, the connection was closed
@ -445,6 +463,7 @@ public void TickOutgoing()
}
}
}
// TODO KcpConnection is IO agnostic. move this to outside later.
catch (SocketException exception)
{
// this is ok, the connection was closed
@ -549,16 +568,16 @@ public void RawInput(byte[] buffer, int msgLength)
}
}
// raw send puts the data into the socket
protected abstract void RawSend(byte[] data, int length);
// raw send called by kcp
void RawSendReliable(byte[] data, int length)
{
// copy channel header, data into raw send buffer, then send
rawSendBuffer[0] = (byte)KcpChannel.Reliable;
Buffer.BlockCopy(data, 0, rawSendBuffer, 1, length);
RawSend(rawSendBuffer, length + 1);
// IO send
ArraySegment<byte> segment = new ArraySegment<byte>(rawSendBuffer, 0, length + 1);
RawSend(segment);
}
void SendReliable(KcpHeader header, ArraySegment<byte> content)
@ -592,7 +611,10 @@ void SendUnreliable(ArraySegment<byte> message)
// copy channel header, data into raw send buffer, then send
rawSendBuffer[0] = (byte)KcpChannel.Unreliable;
Buffer.BlockCopy(message.Array, message.Offset, rawSendBuffer, 1, message.Count);
RawSend(rawSendBuffer, message.Count + 1);
// IO send
ArraySegment<byte> segment = new ArraySegment<byte>(rawSendBuffer, 0, message.Count + 1);
RawSend(segment);
}
// otherwise content is larger than MaxMessageSize. let user know!
// GetType() shows Server/ClientConn instead of just Connection.
@ -645,8 +667,6 @@ public void SendData(ArraySegment<byte> data, KcpChannel channel)
// disconnect info needs to be delivered, so it goes over reliable
void SendDisconnect() => SendReliable(KcpHeader.Disconnect, default);
protected virtual void Dispose() {}
// disconnect this connection
public void Disconnect()
{
@ -655,32 +675,25 @@ public void Disconnect()
return;
// send a disconnect message
//
// previously we checked socket.Connected here before SendDisconnect.
// but this only worked in Unity's mono version.
// in netcore, socket.Connected can't be used for UDP sockets.
// as it should, because there's no actual connection in UDP.
//if (socket.Connected)
//{
try
{
SendDisconnect();
kcp.Flush();
}
catch (SocketException)
{
// this is ok, the connection was already closed
}
catch (ObjectDisposedException)
{
// this is normal when we stop the server
// the socket is stopped so we can't send anything anymore
// to the clients
try
{
SendDisconnect();
kcp.Flush();
}
// TODO KcpConnection is IO agnostic. move this to outside later.
catch (SocketException)
{
// this is ok, the connection was already closed
}
catch (ObjectDisposedException)
{
// this is normal when we stop the server
// the socket is stopped so we can't send anything anymore
// to the clients
// the clients will eventually timeout and realize they
// were disconnected
}
//}
// the clients will eventually timeout and realize they
// were disconnected
}
// set as Disconnected, call event
// GetType() shows Server/ClientConn instead of just Connection.
@ -688,8 +701,5 @@ public void Disconnect()
state = KcpState.Disconnected;
OnDisconnected?.Invoke();
}
// get remote endpoint
public EndPoint GetRemoteEndPoint() => remoteEndPoint;
}
}

View File

@ -57,15 +57,15 @@ public class KcpServer
protected Socket socket;
EndPoint newClientEP;
// IMPORTANT: raw receive buffer always needs to be of 'MTU' size, even
// if MaxMessageSize is larger. kcp always sends in MTU
// segments and having a buffer smaller than MTU would
// silently drop excess data.
// => we need the mtu to fit channel + message!
// raw receive buffer always needs to be of 'MTU' size, even if
// MaxMessageSize is larger. kcp always sends in MTU segments and having
// a buffer smaller than MTU would silently drop excess data.
// => we need the mtu to fit channel + message!
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
// connections <connectionId, connection> where connectionId is EndPoint.GetHashCode
public Dictionary<int, KcpServerConnection> connections = new Dictionary<int, KcpServerConnection>();
public Dictionary<int, KcpServerConnection> connections =
new Dictionary<int, KcpServerConnection>();
public KcpServer(Action<int> OnConnected,
Action<int, ArraySegment<byte>, KcpChannel> OnData,
@ -78,7 +78,7 @@ public KcpServer(Action<int> OnConnected,
bool CongestionWindow = true,
uint SendWindowSize = Kcp.WND_SND,
uint ReceiveWindowSize = Kcp.WND_RCV,
int Timeout = KcpConnection.DEFAULT_TIMEOUT,
int Timeout = KcpPeer.DEFAULT_TIMEOUT,
uint MaxRetransmits = Kcp.DEADLINK,
bool MaximizeSendReceiveBuffersToOSLimit = false)
{
@ -105,31 +105,13 @@ public KcpServer(Action<int> OnConnected,
public bool IsActive() => socket != null;
// if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit.
void ConfigureSocketBufferSizes()
{
if (MaximizeSendReceiveBuffersToOSLimit)
{
// log initial size for comparison.
// remember initial size for log comparison
int initialReceive = socket.ReceiveBufferSize;
int initialSend = socket.SendBufferSize;
socket.SetReceiveBufferToOSLimit();
socket.SetSendBufferToOSLimit();
Log.Info($"KcpServer: RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x) increased to OS limits!");
}
// otherwise still log the defaults for info.
else Log.Info($"KcpServer: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(MaximizeSendReceiveBuffersToOSLimit)} to increase it to OS limit. If they still drop, increase the OS limit.");
}
public void Start(ushort port)
{
// only start once
if (socket != null)
{
Log.Warning("KCP: server already started!");
return;
}
// listen
@ -147,15 +129,22 @@ public void Start(ushort port)
socket.Bind(new IPEndPoint(IPAddress.Any, port));
}
// configure socket buffer size.
ConfigureSocketBufferSizes();
// configure buffer sizes:
// if connections drop under heavy load, increase to OS limit.
// if still not enough, increase the OS limit.
if (MaximizeSendReceiveBuffersToOSLimit)
{
Common.MaximizeSocketBuffers(socket);
}
// otherwise still log the defaults for info.
else Log.Info($"KcpServer: RecvBuf = {socket.ReceiveBufferSize} SendBuf = {socket.SendBufferSize}. If connections drop under heavy load, enable {nameof(MaximizeSendReceiveBuffersToOSLimit)} to increase it to OS limit. If they still drop, increase the OS limit.");
}
public void Send(int connectionId, ArraySegment<byte> segment, KcpChannel channel)
{
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
connection.SendData(segment, channel);
connection.peer.SendData(segment, channel);
}
}
@ -163,7 +152,7 @@ public void Disconnect(int connectionId)
{
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
connection.Disconnect();
connection.peer.Disconnect();
}
}
@ -172,14 +161,15 @@ public IPEndPoint GetClientEndPoint(int connectionId)
{
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
return (connection.GetRemoteEndPoint() as IPEndPoint);
return connection.remoteEndPoint as IPEndPoint;
}
return null;
}
// EndPoint & Receive functions can be overwritten for where-allocation:
// io - input.
// virtual so it may be modified for relays, nonalloc workaround, etc.
// https://github.com/vis2k/where-allocation
protected virtual int ReceiveFrom(byte[] buffer, out int connectionHash)
protected virtual int RawReceive(byte[] buffer, out int connectionHash)
{
// NOTE: ReceiveFrom allocates.
// we pass our IPEndPoint to ReceiveFrom.
@ -201,8 +191,138 @@ protected virtual int ReceiveFrom(byte[] buffer, out int connectionHash)
return read;
}
protected virtual KcpServerConnection CreateConnection() =>
new KcpServerConnection(socket, newClientEP, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
// io - out.
// virtual so it may be modified for relays, nonalloc workaround, etc.
protected virtual void RawSend(ArraySegment<byte> data, EndPoint remoteEndPoint)
{
socket.SendTo(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint);
}
protected virtual KcpServerConnection CreateConnection()
{
// attach EndPoint EP to RawSend.
// kcp needs a simple RawSend(byte[]) function.
Action<ArraySegment<byte>> RawSendWrap =
data => RawSend(data, newClientEP);
KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
return new KcpServerConnection(peer, newClientEP);
}
// receive + add + process once.
// best to call this as long as there is more data to receive.
void ProcessNext()
{
try
{
// receive from socket.
// returns amount of bytes written into buffer.
// throws SocketException if datagram was larger than buffer.
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
int msgLength = RawReceive(rawReceiveBuffer, out int connectionId);
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
// is this a new connection?
if (!connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
// create a new KcpConnection based on last received
// EndPoint. can be overwritten for where-allocation.
connection = CreateConnection();
// DO NOT add to connections yet. only if the first message
// is actually the kcp handshake. otherwise it's either:
// * random data from the internet
// * or from a client connection that we just disconnected
// but that hasn't realized it yet, still sending data
// from last session that we should absolutely ignore.
//
//
// TODO this allocates a new KcpConnection for each new
// internet connection. not ideal, but C# UDP Receive
// already allocated anyway.
//
// expecting a MAGIC byte[] would work, but sending the raw
// UDP message without kcp's reliability will have low
// probability of being received.
//
// for now, this is fine.
// setup authenticated event that also adds to connections
connection.peer.OnAuthenticated = () =>
{
// only send handshake to client AFTER we received his
// handshake in OnAuthenticated.
// we don't want to reply to random internet messages
// with handshakes each time.
connection.peer.SendHandshake();
// add to connections dict after being authenticated.
connections.Add(connectionId, connection);
Log.Info($"KCP: server added connection({connectionId})");
// setup Data + Disconnected events only AFTER the
// handshake. we don't want to fire OnServerDisconnected
// every time we receive invalid random data from the
// internet.
// setup data event
connection.peer.OnData = (message, channel) =>
{
// call mirror event
//Log.Info($"KCP: OnServerDataReceived({connectionId}, {BitConverter.ToString(message.Array, message.Offset, message.Count)})");
OnData.Invoke(connectionId, message, channel);
};
// setup disconnected event
connection.peer.OnDisconnected = () =>
{
// flag for removal
// (can't remove directly because connection is updated
// and event is called while iterating all connections)
connectionsToRemove.Add(connectionId);
// call mirror event
Log.Info($"KCP: OnServerDisconnected({connectionId})");
OnDisconnected(connectionId);
};
// setup error event
connection.peer.OnError = (error, reason) =>
{
OnError(connectionId, error, reason);
};
// finally, call mirror OnConnected event
Log.Info($"KCP: OnServerConnected({connectionId})");
OnConnected(connectionId);
};
// now input the message & process received ones
// connected event was set up.
// tick will process the first message and adds the
// connection if it was the handshake.
connection.peer.RawInput(rawReceiveBuffer, msgLength);
connection.peer.TickIncoming();
// again, do not add to connections.
// if the first message wasn't the kcp handshake then
// connection will simply be garbage collected.
}
// existing connection: simply input the message into kcp
else
{
connection.peer.RawInput(rawReceiveBuffer, msgLength);
}
}
// this is fine, the socket might have been closed in the other end
catch (SocketException ex)
{
// the other end closing the connection is not an 'error'.
// but connections should never just end silently.
// at least log a message for easier debugging.
Log.Info($"KCP ClientConnection: looks like the other end has closed the connection. This is fine: {ex}");
}
}
// process incoming messages. should be called before updating the world.
HashSet<int> connectionsToRemove = new HashSet<int>();
@ -210,131 +330,14 @@ public void TickIncoming()
{
while (socket != null && socket.Poll(0, SelectMode.SelectRead))
{
try
{
// receive
int msgLength = ReceiveFrom(rawReceiveBuffer, out int connectionId);
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
// IMPORTANT: detect if buffer was too small for the received
// msgLength. otherwise the excess data would be
// silently lost.
// (see ReceiveFrom documentation)
if (msgLength <= rawReceiveBuffer.Length)
{
// is this a new connection?
if (!connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
// create a new KcpConnection based on last received
// EndPoint. can be overwritten for where-allocation.
connection = CreateConnection();
// DO NOT add to connections yet. only if the first message
// is actually the kcp handshake. otherwise it's either:
// * random data from the internet
// * or from a client connection that we just disconnected
// but that hasn't realized it yet, still sending data
// from last session that we should absolutely ignore.
//
//
// TODO this allocates a new KcpConnection for each new
// internet connection. not ideal, but C# UDP Receive
// already allocated anyway.
//
// expecting a MAGIC byte[] would work, but sending the raw
// UDP message without kcp's reliability will have low
// probability of being received.
//
// for now, this is fine.
// setup authenticated event that also adds to connections
connection.OnAuthenticated = () =>
{
// only send handshake to client AFTER we received his
// handshake in OnAuthenticated.
// we don't want to reply to random internet messages
// with handshakes each time.
connection.SendHandshake();
// add to connections dict after being authenticated.
connections.Add(connectionId, connection);
Log.Info($"KCP: server added connection({connectionId})");
// setup Data + Disconnected events only AFTER the
// handshake. we don't want to fire OnServerDisconnected
// every time we receive invalid random data from the
// internet.
// setup data event
connection.OnData = (message, channel) =>
{
// call mirror event
//Log.Info($"KCP: OnServerDataReceived({connectionId}, {BitConverter.ToString(message.Array, message.Offset, message.Count)})");
OnData.Invoke(connectionId, message, channel);
};
// setup disconnected event
connection.OnDisconnected = () =>
{
// flag for removal
// (can't remove directly because connection is updated
// and event is called while iterating all connections)
connectionsToRemove.Add(connectionId);
// call mirror event
Log.Info($"KCP: OnServerDisconnected({connectionId})");
OnDisconnected(connectionId);
};
// setup error event
connection.OnError = (error, reason) =>
{
OnError(connectionId, error, reason);
};
// finally, call mirror OnConnected event
Log.Info($"KCP: OnServerConnected({connectionId})");
OnConnected(connectionId);
};
// now input the message & process received ones
// connected event was set up.
// tick will process the first message and adds the
// connection if it was the handshake.
connection.RawInput(rawReceiveBuffer, msgLength);
connection.TickIncoming();
// again, do not add to connections.
// if the first message wasn't the kcp handshake then
// connection will simply be garbage collected.
}
// existing connection: simply input the message into kcp
else
{
connection.RawInput(rawReceiveBuffer, msgLength);
}
}
else
{
Log.Error($"KCP Server: message of size {msgLength} does not fit into buffer of size {rawReceiveBuffer.Length}. The excess was silently dropped. Disconnecting connectionId={connectionId}.");
Disconnect(connectionId);
}
}
// this is fine, the socket might have been closed in the other end
catch (SocketException ex)
{
// the other end closing the connection is not an 'error'.
// but connections should never just end silently.
// at least log a message for easier debugging.
Log.Info($"KCP ClientConnection: looks like the other end has closed the connection. This is fine: {ex}");
}
ProcessNext();
}
// process inputs for all server connections
// (even if we didn't receive anything. need to tick ping etc.)
foreach (KcpServerConnection connection in connections.Values)
{
connection.TickIncoming();
connection.peer.TickIncoming();
}
// remove disconnected connections
@ -353,7 +356,7 @@ public void TickOutgoing()
// flush all server connections
foreach (KcpServerConnection connection in connections.Values)
{
connection.TickOutgoing();
connection.peer.TickOutgoing();
}
}

View File

@ -1,22 +1,19 @@
// server needs to store a separate KcpPeer for each connection.
// as well as remoteEndPoint so we know where to send data to.
using System.Net;
using System.Net.Sockets;
namespace kcp2k
{
public class KcpServerConnection : KcpConnection
// struct to avoid memory indirection
public struct KcpServerConnection
{
// Constructor & Send functions can be overwritten for where-allocation:
// https://github.com/vis2k/where-allocation
public KcpServerConnection(Socket socket, EndPoint remoteEndPoint, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT, uint maxRetransmits = Kcp.DEADLINK)
{
this.socket = socket;
this.remoteEndPoint = remoteEndPoint;
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout, maxRetransmits);
}
public readonly KcpPeer peer;
public readonly EndPoint remoteEndPoint;
protected override void RawSend(byte[] data, int length)
public KcpServerConnection(KcpPeer peer, EndPoint remoteEndPoint)
{
socket.SendTo(data, 0, length, SocketFlags.None, remoteEndPoint);
this.peer = peer;
this.remoteEndPoint = remoteEndPoint;
}
}
}

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 4c1b235bbe054706bef6d092f361006e
timeCreated: 1626430539

View File

@ -1 +0,0 @@
// removed 2022-11-22

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 2cf0ccf7d551480bb5af08fcbe169f84
timeCreated: 1626435264

View File

@ -1,25 +0,0 @@
// where-allocation version of KcpServerConnection.
// may not be wanted on all platforms, so it's an extra optional class.
using System.Net;
using System.Net.Sockets;
using WhereAllocation;
namespace kcp2k
{
public class KcpServerConnectionNonAlloc : KcpServerConnection
{
IPEndPointNonAlloc reusableSendEndPoint;
public KcpServerConnectionNonAlloc(Socket socket, EndPoint remoteEndpoint, IPEndPointNonAlloc reusableSendEndPoint, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT, uint maxRetransmits = Kcp.DEADLINK)
: base(socket, remoteEndpoint, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout, maxRetransmits)
{
this.reusableSendEndPoint = reusableSendEndPoint;
}
protected override void RawSend(byte[] data, int length)
{
// where-allocation nonalloc send
socket.SendTo_NonAlloc(data, 0, length, SocketFlags.None, reusableSendEndPoint);
}
}
}

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 4e1b74cc224b4c83a0f6c8d8da9090ab
timeCreated: 1626430608

View File

@ -9,7 +9,7 @@ namespace kcp2k
{
public class KcpServerNonAlloc : KcpServer
{
IPEndPointNonAlloc reusableClientEP;
readonly IPEndPointNonAlloc reusableClientEP;
public KcpServerNonAlloc(Action<int> OnConnected,
Action<int, ArraySegment<byte>, KcpChannel> OnData,
@ -22,7 +22,7 @@ public KcpServerNonAlloc(Action<int> OnConnected,
bool CongestionWindow = true,
uint SendWindowSize = Kcp.WND_SND,
uint ReceiveWindowSize = Kcp.WND_RCV,
int Timeout = KcpConnection.DEFAULT_TIMEOUT,
int Timeout = KcpPeer.DEFAULT_TIMEOUT,
uint MaxRetransmits = Kcp.DEADLINK,
bool MaximizeSendReceiveBuffersToOSLimit = false)
: base(OnConnected,
@ -46,7 +46,7 @@ public KcpServerNonAlloc(Action<int> OnConnected,
: new IPEndPointNonAlloc(IPAddress.Any, 0);
}
protected override int ReceiveFrom(byte[] buffer, out int connectionHash)
protected override int RawReceive(byte[] buffer, out int connectionHash)
{
// where-allocation nonalloc ReceiveFrom.
int read = socket.ReceiveFrom_NonAlloc(buffer, 0, buffer.Length, SocketFlags.None, reusableClientEP);
@ -57,6 +57,13 @@ protected override int ReceiveFrom(byte[] buffer, out int connectionHash)
return read;
}
// make sure to pass IPEndPointNonAlloc as remoteEndPoint
protected override void RawSend(ArraySegment<byte> data, EndPoint remoteEndPoint)
{
// where-allocation nonalloc send
socket.SendTo_NonAlloc(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint as IPEndPointNonAlloc);
}
protected override KcpServerConnection CreateConnection()
{
// IPEndPointNonAlloc is reused all the time.
@ -68,10 +75,13 @@ protected override KcpServerConnection CreateConnection()
// IPEndPointNonAlloc...
IPEndPointNonAlloc reusableSendEP = new IPEndPointNonAlloc(newClientEP.Address, newClientEP.Port);
// create a new KcpConnection NonAlloc version
// -> where-allocation IPEndPointNonAlloc is reused.
// need to create a new one from the temp address.
return new KcpServerConnectionNonAlloc(socket, newClientEP, reusableSendEP, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
// attach reusable EP to RawSend.
// kcp needs a simple RawSend(byte[]) function.
Action<ArraySegment<byte>> RawSendWrap =
data => RawSend(data, reusableSendEP);
KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
return new KcpServerConnection(peer, newClientEP);
}
}
}