From f0aae71f16a32342f6928cd770c86183a85aafd3 Mon Sep 17 00:00:00 2001 From: vis2k Date: Thu, 24 Nov 2022 18:25:23 +0100 Subject: [PATCH] kcp2k V1.22 --- .../Mirror/Transports/KCP/kcp2k/VERSION.txt | 3 ++ .../KCP/kcp2k/highlevel/KcpClient.cs | 2 +- .../Transports/KCP/kcp2k/highlevel/KcpPeer.cs | 15 +++--- .../KCP/kcp2k/highlevel/KcpServer.cs | 52 ++++++++++++------- .../highlevel/NonAlloc/KcpServerNonAlloc.cs | 14 ++--- 5 files changed, 53 insertions(+), 33 deletions(-) diff --git a/Assets/Mirror/Transports/KCP/kcp2k/VERSION.txt b/Assets/Mirror/Transports/KCP/kcp2k/VERSION.txt index 1eab700a6..487aa4cf9 100755 --- a/Assets/Mirror/Transports/KCP/kcp2k/VERSION.txt +++ b/Assets/Mirror/Transports/KCP/kcp2k/VERSION.txt @@ -1,3 +1,6 @@ +V1.22 [2022-11-24] +- KcpPeer.RawInput: add offset parameter + V1.21 [2022-11-24] - high level refactor, part one. - KcpPeer instead of KcpConnection, KcpClientConnection, KcpServerConnection diff --git a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpClient.cs b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpClient.cs index 02884a36b..e3e58b847 100644 --- a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpClient.cs +++ b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpClient.cs @@ -145,7 +145,7 @@ protected virtual void RawReceive() int msgLength = socket.Receive(rawReceiveBuffer); //Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}"); - peer.RawInput(rawReceiveBuffer, msgLength); + peer.RawInput(rawReceiveBuffer, 0, msgLength); } } // this is fine, the socket might have been closed in the other end diff --git a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpPeer.cs b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpPeer.cs index f60c20209..ad123c482 100644 --- a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpPeer.cs +++ b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpPeer.cs @@ -490,22 +490,25 @@ public void TickOutgoing() } } - public void RawInput(byte[] buffer, int msgLength) + // insert raw IO. usually from socket.Receive. + // offset is useful for relays, where we may parse a header and then + // feed the rest to kcp. + public void RawInput(byte[] buffer, int offset, int size) { // parse channel - if (msgLength > 0) + if (size > 0) { - byte channel = buffer[0]; + byte channel = buffer[offset + 0]; switch (channel) { case (byte)KcpChannel.Reliable: { // input into kcp, but skip channel byte - int input = kcp.Input(buffer, 1, msgLength - 1); + int input = kcp.Input(buffer, offset + 1, size - 1); if (input != 0) { // GetType() shows Server/ClientConn instead of just Connection. - Log.Warning($"{GetType()}: Input failed with error={input} for buffer with length={msgLength - 1}"); + Log.Warning($"{GetType()}: Input failed with error={input} for buffer with length={size - 1}"); } break; } @@ -532,7 +535,7 @@ public void RawInput(byte[] buffer, int msgLength) // the current state allows it. if (state == KcpState.Authenticated) { - ArraySegment message = new ArraySegment(buffer, 1, msgLength - 1); + ArraySegment message = new ArraySegment(buffer, offset + 1, size - 1); OnData?.Invoke(message, KcpChannel.Unreliable); // set last receive time to avoid timeout. diff --git a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpServer.cs b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpServer.cs index 3c5b4abfd..c712df424 100644 --- a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpServer.cs +++ b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/KcpServer.cs @@ -103,9 +103,9 @@ public KcpServer(Action OnConnected, : new IPEndPoint(IPAddress.Any, 0); } - public bool IsActive() => socket != null; + public virtual bool IsActive() => socket != null; - public void Start(ushort port) + public virtual void Start(ushort port) { // only start once if (socket != null) @@ -166,19 +166,28 @@ public IPEndPoint GetClientEndPoint(int connectionId) return null; } + // io - poll. + // return true if there is data to read. + // after which RawReceive will be called. + // virtual because for relays, + protected virtual bool RawPoll() => + socket != null && socket.Poll(0, SelectMode.SelectRead); + // io - input. // virtual so it may be modified for relays, nonalloc workaround, etc. // https://github.com/vis2k/where-allocation - protected virtual int RawReceive(byte[] buffer, out int connectionHash) + // bool return because not all receives may be valid. + // for example, relay may expect a certain header. + protected virtual bool RawReceive(byte[] buffer, out int size, out int connectionId) { // NOTE: ReceiveFrom allocates. // we pass our IPEndPoint to ReceiveFrom. // receive from calls newClientEP.Create(socketAddr). // IPEndPoint.Create always returns a new IPEndPoint. // https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761 - int read = socket.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref newClientEP); + size = socket.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref newClientEP); - // calculate connectionHash from endpoint + // set connectionId to hash from endpoint // NOTE: IPEndPoint.GetHashCode() allocates. // it calls m_Address.GetHashCode(). // m_Address is an IPAddress. @@ -187,23 +196,24 @@ protected virtual int RawReceive(byte[] buffer, out int connectionHash) // // => using only newClientEP.Port wouldn't work, because // different connections can have the same port. - connectionHash = newClientEP.GetHashCode(); - return read; + connectionId = newClientEP.GetHashCode(); + return true; } // io - out. // virtual so it may be modified for relays, nonalloc workaround, etc. - protected virtual void RawSend(ArraySegment data, EndPoint remoteEndPoint) + // relays may need to prefix connId (and remoteEndPoint would be same for all) + protected virtual void RawSend(int connectionId, ArraySegment data, EndPoint remoteEndPoint) { socket.SendTo(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint); } - protected virtual KcpServerConnection CreateConnection() + protected virtual KcpServerConnection CreateConnection(int connectionId) { // attach EndPoint EP to RawSend. // kcp needs a simple RawSend(byte[]) function. Action> RawSendWrap = - data => RawSend(data, newClientEP); + data => RawSend(connectionId, data, newClientEP); KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits); return new KcpServerConnection(peer, newClientEP); @@ -219,7 +229,9 @@ void ProcessNext() // returns amount of bytes written into buffer. // throws SocketException if datagram was larger than buffer. // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 - int msgLength = RawReceive(rawReceiveBuffer, out int connectionId); + if (!RawReceive(rawReceiveBuffer, out int size, out int connectionId)) + return; + //Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}"); // is this a new connection? @@ -227,7 +239,7 @@ void ProcessNext() { // create a new KcpConnection based on last received // EndPoint. can be overwritten for where-allocation. - connection = CreateConnection(); + connection = CreateConnection(connectionId); // DO NOT add to connections yet. only if the first message // is actually the kcp handshake. otherwise it's either: @@ -301,7 +313,7 @@ void ProcessNext() // connected event was set up. // tick will process the first message and adds the // connection if it was the handshake. - connection.peer.RawInput(rawReceiveBuffer, msgLength); + connection.peer.RawInput(rawReceiveBuffer, 0, size); connection.peer.TickIncoming(); // again, do not add to connections. @@ -311,7 +323,7 @@ void ProcessNext() // existing connection: simply input the message into kcp else { - connection.peer.RawInput(rawReceiveBuffer, msgLength); + connection.peer.RawInput(rawReceiveBuffer, 0, size); } } // this is fine, the socket might have been closed in the other end @@ -325,10 +337,11 @@ void ProcessNext() } // process incoming messages. should be called before updating the world. + // virtual because relay may need to inject their own ping or similar. HashSet connectionsToRemove = new HashSet(); - public void TickIncoming() + public virtual void TickIncoming() { - while (socket != null && socket.Poll(0, SelectMode.SelectRead)) + while (RawPoll()) { ProcessNext(); } @@ -351,7 +364,8 @@ public void TickIncoming() } // process outgoing messages. should be called after updating the world. - public void TickOutgoing() + // virtual because relay may need to inject their own ping or similar. + public virtual void TickOutgoing() { // flush all server connections foreach (KcpServerConnection connection in connections.Values) @@ -363,13 +377,13 @@ public void TickOutgoing() // process incoming and outgoing for convenience. // => ideally call ProcessIncoming() before updating the world and // ProcessOutgoing() after updating the world for minimum latency - public void Tick() + public virtual void Tick() { TickIncoming(); TickOutgoing(); } - public void Stop() + public virtual void Stop() { socket?.Close(); socket = null; diff --git a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/NonAlloc/KcpServerNonAlloc.cs b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/NonAlloc/KcpServerNonAlloc.cs index 19fd50b02..8d56db2e6 100644 --- a/Assets/Mirror/Transports/KCP/kcp2k/highlevel/NonAlloc/KcpServerNonAlloc.cs +++ b/Assets/Mirror/Transports/KCP/kcp2k/highlevel/NonAlloc/KcpServerNonAlloc.cs @@ -46,25 +46,25 @@ public KcpServerNonAlloc(Action OnConnected, : new IPEndPointNonAlloc(IPAddress.Any, 0); } - protected override int RawReceive(byte[] buffer, out int connectionHash) + protected override bool RawReceive(byte[] buffer, out int size, out int connectionId) { // where-allocation nonalloc ReceiveFrom. - int read = socket.ReceiveFrom_NonAlloc(buffer, 0, buffer.Length, SocketFlags.None, reusableClientEP); + size = socket.ReceiveFrom_NonAlloc(buffer, 0, buffer.Length, SocketFlags.None, reusableClientEP); SocketAddress remoteAddress = reusableClientEP.temp; // where-allocation nonalloc GetHashCode - connectionHash = remoteAddress.GetHashCode(); - return read; + connectionId = remoteAddress.GetHashCode(); + return true; } // make sure to pass IPEndPointNonAlloc as remoteEndPoint - protected override void RawSend(ArraySegment data, EndPoint remoteEndPoint) + protected override void RawSend(int connectionId, ArraySegment data, EndPoint remoteEndPoint) { // where-allocation nonalloc send socket.SendTo_NonAlloc(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint as IPEndPointNonAlloc); } - protected override KcpServerConnection CreateConnection() + protected override KcpServerConnection CreateConnection(int connectionId) { // IPEndPointNonAlloc is reused all the time. // we can't store that as the connection's endpoint. @@ -78,7 +78,7 @@ protected override KcpServerConnection CreateConnection() // attach reusable EP to RawSend. // kcp needs a simple RawSend(byte[]) function. Action> RawSendWrap = - data => RawSend(data, reusableSendEP); + data => RawSend(connectionId, data, reusableSendEP); KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits); return new KcpServerConnection(peer, newClientEP);