fix: kcp2k V1.10

- feature: configurable Timeout
- allocations explained with comments (C# ReceiveFrom / IPEndPoint.GetHashCode)
- fix: #17 KcpConnection.ReceiveNextReliable now assigns message default so it
  works in .net too
- fix: Segment pool is not static anymore. Each kcp instance now has it's own
  Pool<Segment>. fixes #18 concurrency issues
This commit is contained in:
vis2k 2021-05-28 16:32:51 +08:00
parent 3abaa70f0f
commit 10c39a59cc
10 changed files with 112 additions and 45 deletions

View File

@ -20,6 +20,9 @@ public class KcpTransport : Transport
public bool NoDelay = true;
[Tooltip("KCP internal update interval. 100ms is KCP default, but a lower interval is recommended to minimize latency and to scale to more networked entities.")]
public uint Interval = 10;
[Tooltip("KCP timeout in milliseconds. Note that KCP sends a ping automatically.")]
public int Timeout = 10000;
[Header("Advanced")]
[Tooltip("KCP fastresend parameter. Faster resend for the cost of higher bandwidth. 0 in normal mode, 2 in turbo mode.")]
public int FastResend = 2;
@ -71,7 +74,8 @@ void Awake()
FastResend,
CongestionWindow,
SendWindowSize,
ReceiveWindowSize
ReceiveWindowSize,
Timeout
);
if (statisticsLog)
@ -88,7 +92,7 @@ public override bool Available() =>
public override bool ClientConnected() => client.connected;
public override void ClientConnect(string address)
{
client.Connect(address, Port, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize);
client.Connect(address, Port, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout);
}
public override void ClientSend(ArraySegment<byte> segment, int channelId)
{

View File

@ -22,7 +22,7 @@ public KcpClient(Action OnConnected, Action<ArraySegment<byte>> OnData, Action O
this.OnDisconnected = OnDisconnected;
}
public void Connect(string address, ushort port, bool noDelay, uint interval, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
public void Connect(string address, ushort port, bool noDelay, uint interval, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = KcpConnection.DEFAULT_TIMEOUT)
{
if (connected)
{
@ -53,7 +53,7 @@ public void Connect(string address, ushort port, bool noDelay, uint interval, in
};
// connect
connection.Connect(address, port, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
connection.Connect(address, port, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout);
}
public void Send(ArraySegment<byte> segment, KcpChannel channel)

View File

@ -12,7 +12,7 @@ public class KcpClientConnection : KcpConnection
// => we need the MTU to fit channel + message!
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
public void Connect(string host, ushort port, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
public void Connect(string host, ushort port, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT)
{
Log.Info($"KcpClient: connect to {host}:{port}");
IPAddress[] ipAddress = Dns.GetHostAddresses(host);
@ -22,7 +22,7 @@ public void Connect(string host, ushort port, bool noDelay, uint interval = Kcp.
remoteEndpoint = new IPEndPoint(ipAddress[0], port);
socket = new Socket(remoteEndpoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
socket.Connect(remoteEndpoint);
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout);
// client should send handshake to server as very first message
SendHandshake();

View File

@ -28,7 +28,8 @@ public abstract class KcpConnection
// If we don't receive anything these many milliseconds
// then consider us disconnected
public const int TIMEOUT = 10000;
public const int DEFAULT_TIMEOUT = 10000;
public int timeout = DEFAULT_TIMEOUT;
uint lastReceiveTime;
// internal time.
@ -123,9 +124,11 @@ public abstract class KcpConnection
public uint MaxReceiveRate =>
kcp.rcv_wnd * kcp.mtu * 1000 / kcp.interval;
// NoDelay, interval, window size are the most important configurations.
// let's force require the parameters so we don't forget it anywhere.
protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
// SetupKcp creates and configures a new KCP instance.
// => useful to start from a fresh state every time the client connects
// => NoDelay, interval, wnd size are the most important configurations.
// let's force require the parameters so we don't forget it anywhere.
protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT)
{
// set up kcp over reliable channel (that's what kcp is for)
kcp = new Kcp(0, RawSendReliable);
@ -140,6 +143,7 @@ protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastRese
// message afterwards.
kcp.SetMtu(Kcp.MTU_DEF - CHANNEL_HEADER_SIZE);
this.timeout = timeout;
state = KcpState.Connected;
refTime.Start();
@ -149,9 +153,9 @@ void HandleTimeout(uint time)
{
// note: we are also sending a ping regularly, so timeout should
// only ever happen if the connection is truly gone.
if (time >= lastReceiveTime + TIMEOUT)
if (time >= lastReceiveTime + timeout)
{
Log.Warning($"KCP: Connection timed out after not receiving any message for {TIMEOUT}ms. Disconnecting.");
Log.Warning($"KCP: Connection timed out after not receiving any message for {timeout}ms. Disconnecting.");
Disconnect();
}
}
@ -240,6 +244,7 @@ bool ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message)
}
}
message = default;
header = KcpHeader.Disconnect;
return false;
}

View File

@ -36,6 +36,8 @@ public class KcpServer
// 8192, 8192 for 20k monsters
public uint SendWindowSize;
public uint ReceiveWindowSize;
// timeout in milliseconds
public int Timeout;
// state
Socket socket;
@ -63,7 +65,8 @@ public KcpServer(Action<int> OnConnected,
int FastResend = 0,
bool CongestionWindow = true,
uint SendWindowSize = Kcp.WND_SND,
uint ReceiveWindowSize = Kcp.WND_RCV)
uint ReceiveWindowSize = Kcp.WND_RCV,
int Timeout = KcpConnection.DEFAULT_TIMEOUT)
{
this.OnConnected = OnConnected;
this.OnData = OnData;
@ -74,6 +77,7 @@ public KcpServer(Action<int> OnConnected,
this.CongestionWindow = CongestionWindow;
this.SendWindowSize = SendWindowSize;
this.ReceiveWindowSize = ReceiveWindowSize;
this.Timeout = Timeout;
}
public bool IsActive() => socket != null;
@ -131,10 +135,23 @@ public void TickIncoming()
{
try
{
// NOTE: ReceiveFrom allocates.
// we pass our IPEndPoint to ReceiveFrom.
// receive from calls newClientEP.Create(socketAddr).
// IPEndPoint.Create always returns a new IPEndPoint.
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761
int msgLength = socket.ReceiveFrom(rawReceiveBuffer, 0, rawReceiveBuffer.Length, SocketFlags.None, ref newClientEP);
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
// calculate connectionId from endpoint
// NOTE: IPEndPoint.GetHashCode() allocates.
// it calls m_Address.GetHashCode().
// m_Address is an IPAddress.
// GetHashCode() allocates for IPv6:
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/IPAddress.cs#L699
//
// => using only newClientEP.Port wouldn't work, because
// different connections can have the same port.
int connectionId = newClientEP.GetHashCode();
// IMPORTANT: detect if buffer was too small for the received
@ -147,7 +164,7 @@ public void TickIncoming()
if (!connections.TryGetValue(connectionId, out KcpServerConnection connection))
{
// create a new KcpConnection
connection = new KcpServerConnection(socket, newClientEP, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize);
connection = new KcpServerConnection(socket, newClientEP, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout);
// DO NOT add to connections yet. only if the first message
// is actually the kcp handshake. otherwise it's either:

View File

@ -5,11 +5,11 @@ namespace kcp2k
{
public class KcpServerConnection : KcpConnection
{
public KcpServerConnection(Socket socket, EndPoint remoteEndpoint, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
public KcpServerConnection(Socket socket, EndPoint remoteEndpoint, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV, int timeout = DEFAULT_TIMEOUT)
{
this.socket = socket;
this.remoteEndpoint = remoteEndpoint;
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize, timeout);
}
protected override void RawSend(byte[] data, int length)

View File

@ -87,6 +87,17 @@ internal struct AckItem
// get how many packet is waiting to be sent
public int WaitSnd => snd_buf.Count + snd_queue.Count;
// segment pool to avoid allocations in C#.
// this is not part of the original C code.
readonly Pool<Segment> SegmentPool = new Pool<Segment>(
// create new segment
() => new Segment(),
// reset segment before reuse
(segment) => segment.Reset(),
// initial capacity
32
);
// ikcp_create
// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection.
@ -112,18 +123,12 @@ public Kcp(uint conv, Action<byte[], int> output)
// ikcp_segment_new
// we keep the original function and add our pooling to it.
// this way we'll never miss it anywhere.
static Segment SegmentNew()
{
return Segment.Take();
}
Segment SegmentNew() => SegmentPool.Take();
// ikcp_segment_delete
// we keep the original function and add our pooling to it.
// this way we'll never miss it anywhere.
static void SegmentDelete(Segment seg)
{
Segment.Return(seg);
}
void SegmentDelete(Segment seg) => SegmentPool.Return(seg);
// ikcp_recv
// receive data from kcp state machine

View File

@ -0,0 +1,46 @@
// Pool to avoid allocations (from libuv2k & Mirror)
using System;
using System.Collections.Generic;
namespace kcp2k
{
public class Pool<T>
{
// Mirror is single threaded, no need for concurrent collections
readonly Stack<T> objects = new Stack<T>();
// some types might need additional parameters in their constructor, so
// we use a Func<T> generator
readonly Func<T> objectGenerator;
// some types might need additional cleanup for returned objects
readonly Action<T> objectResetter;
public Pool(Func<T> objectGenerator, Action<T> objectResetter, int initialCapacity)
{
this.objectGenerator = objectGenerator;
this.objectResetter = objectResetter;
// allocate an initial pool so we have fewer (if any)
// allocations in the first few frames (or seconds).
for (int i = 0; i < initialCapacity; ++i)
objects.Push(objectGenerator());
}
// take an element from the pool, or create a new one if empty
public T Take() => objects.Count > 0 ? objects.Pop() : objectGenerator();
// return an element to the pool
public void Return(T item)
{
objectResetter(item);
objects.Push(item);
}
// clear the pool
public void Clear() => objects.Clear();
// count to see how many objects are in the pool. useful for tests.
public int Count => objects.Count;
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 35c07818fc4784bb4ba472c8e5029002
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,4 +1,3 @@
using System.Collections.Generic;
using System.IO;
namespace kcp2k
@ -22,26 +21,6 @@ internal class Segment
// note: no need to pool it, because Segment is already pooled.
internal MemoryStream data = new MemoryStream();
// pool ////////////////////////////////////////////////////////////////
internal static readonly Stack<Segment> Pool = new Stack<Segment>(32);
public static Segment Take()
{
if (Pool.Count > 0)
{
Segment seg = Pool.Pop();
return seg;
}
return new Segment();
}
public static void Return(Segment seg)
{
seg.Reset();
Pool.Push(seg);
}
////////////////////////////////////////////////////////////////////////
// ikcp_encode_seg
// encode a segment into buffer
internal int Encode(byte[] ptr, int offset)