diff --git a/Assets/Mirror/Examples/Benchmark/Scenes/Scene.unity b/Assets/Mirror/Examples/Benchmark/Scenes/Scene.unity index 7a76df0e5..1abb7bbf9 100644 --- a/Assets/Mirror/Examples/Benchmark/Scenes/Scene.unity +++ b/Assets/Mirror/Examples/Benchmark/Scenes/Scene.unity @@ -268,8 +268,8 @@ GameObject: - component: {fileID: 1282001518} - component: {fileID: 1282001520} - component: {fileID: 1282001519} - - component: {fileID: 1282001521} - component: {fileID: 1282001522} + - component: {fileID: 1282001521} m_Layer: 0 m_Name: NetworkManager m_TagString: Untagged @@ -347,7 +347,7 @@ MonoBehaviour: m_GameObject: {fileID: 1282001517} m_Enabled: 1 m_EditorHideFlags: 0 - m_Script: {fileID: 11500000, guid: c7424c1070fad4ba2a7a96b02fbeb4bb, type: 3} + m_Script: {fileID: 11500000, guid: 6b0fecffa3f624585964b0d0eb21b18e, type: 3} m_Name: m_EditorClassIdentifier: OnClientConnected: @@ -374,12 +374,9 @@ MonoBehaviour: OnServerDisconnected: m_PersistentCalls: m_Calls: [] - port: 7777 + Port: 7777 NoDelay: 1 - serverMaxMessageSize: 16384 - serverMaxReceivesPerTick: 10000 - clientMaxMessageSize: 16384 - clientMaxReceivesPerTick: 1000 + Interval: 10 --- !u!114 &1282001522 MonoBehaviour: m_ObjectHideFlags: 0 diff --git a/Assets/Mirror/Runtime/Transport/KCP.meta b/Assets/Mirror/Runtime/Transport/KCP.meta new file mode 100644 index 000000000..c044a3b24 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 8f38345a2b94c4b5ca63a775eaad5584 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef b/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef new file mode 100755 index 000000000..057baaae1 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef @@ -0,0 +1,16 @@ +{ + "name": "KCP", + "references": [ + "GUID:30817c1a0e6d646d99c048fc403f5979", + "GUID:f51ebe6a0ceec4240a699833d6309b23" + ], + "includePlatforms": [], + "excludePlatforms": [], + "allowUnsafeCode": true, + "overrideReferences": false, + "precompiledReferences": [], + "autoReferenced": true, + "defineConstraints": [], + "versionDefines": [], + "noEngineReferences": false +} \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef.meta b/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef.meta new file mode 100644 index 000000000..1f0470b8d --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/KCP.asmdef.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: f34a216d4b31d4eb4872b2f30c5a6a11 +AssemblyDefinitionImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport.meta b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport.meta new file mode 100644 index 000000000..dedea2fb5 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 7bdb797750d0a490684410110bf48192 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs new file mode 100644 index 000000000..b84e407eb --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs @@ -0,0 +1,321 @@ +#if MIRROR +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using UnityEngine; +using kcp2k; + +namespace Mirror.KCP +{ + public class KcpTransport : Transport + { + // common + [Header("Transport Configuration")] + public ushort Port = 7777; + [Tooltip("NoDelay is recommended to reduce latency. This also scales better without buffers getting full.")] + public bool NoDelay = true; + [Tooltip("KCP internal update interval. 100ms is KCP default, but a lower interval is recommended to minimize latency and to scale to more networked entities.")] + public uint Interval = 10; + readonly byte[] buffer = new byte[Kcp.MTU_DEF]; + + // server + Socket serverSocket; + EndPoint serverNewClientEP = new IPEndPoint(IPAddress.IPv6Any, 0); + // connections where connectionId is EndPoint.GetHashCode + Dictionary connections = new Dictionary(); + + // client + KcpClientConnection clientConnection; + bool clientConnected; + + void Awake() + { + Debug.Log("KcpTransport initialized!"); + } + + // all except WebGL + public override bool Available() => + Application.platform != RuntimePlatform.WebGLPlayer; + + // use same Kcp configuration on server and client + void ConfigureKcpConnection(KcpConnection connection) + { + // TODO consider lower interval IF interval matters in nodelay mode + + // we did this in previous test + connection.kcp.SetNoDelay(1, 10, 2, true); + + // this works for 4k: + //connection.kcp.SetWindowSize(128, 128); + // this works for 10k: + connection.kcp.SetWindowSize(512, 512); + // this works for 20k: + //connection.kcp.SetWindowSize(8192, 8192); + } + + // client + public override bool ClientConnected() => clientConnection != null; + public override void ClientConnect(string address) + { + if (clientConnected) + { + Debug.LogWarning("KCP: client already connected!"); + return; + } + + clientConnection = new KcpClientConnection(); + // setup events + clientConnection.OnConnected += () => + { + Debug.Log($"KCP: OnClientConnected"); + clientConnected = true; + OnClientConnected.Invoke(); + }; + clientConnection.OnData += (message) => + { + //Debug.Log($"KCP: OnClientDataReceived({BitConverter.ToString(message.Array, message.Offset, message.Count)})"); + OnClientDataReceived.Invoke(message); + }; + clientConnection.OnDisconnected += () => + { + Debug.Log($"KCP: OnClientDisconnected"); + clientConnected = false; + OnClientDisconnected.Invoke(); + }; + + // connect + clientConnection.Connect(address, Port, NoDelay, Interval); + + // configure connection for max scale + ConfigureKcpConnection(clientConnection); + } + public override bool ClientSend(int channelId, ArraySegment segment) + { + if (clientConnection != null) + { + clientConnection.Send(segment); + return true; + } + Debug.LogWarning("KCP: can't send because client not connected!"); + return false; + } + + public override void ClientDisconnect() + { + // only if connected + // otherwise we end up in a deadlock because of an open Mirror bug: + // https://github.com/vis2k/Mirror/issues/2353 + if (clientConnected) + { + clientConnection?.Disconnect(); + clientConnection = null; + } + } + + HashSet connectionsToRemove = new HashSet(); + void UpdateServer() + { + while (serverSocket != null && serverSocket.Poll(0, SelectMode.SelectRead)) + { + int msgLength = serverSocket.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref serverNewClientEP); + //Debug.Log($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}"); + + // calculate connectionId from endpoint + int connectionId = serverNewClientEP.GetHashCode(); + + // is this a new connection? + if (!connections.TryGetValue(connectionId, out KcpServerConnection connection)) + { + // add it to a queue + connection = new KcpServerConnection(serverSocket, serverNewClientEP, NoDelay, Interval); + + // configure connection for max scale + ConfigureKcpConnection(connection); + + //acceptedConnections.Writer.TryWrite(connection); + connections.Add(connectionId, connection); + Debug.Log($"KCP: server added connection {serverNewClientEP}"); + + // setup connected event + connection.OnConnected += () => + { + // call mirror event + Debug.Log($"KCP: OnServerConnected({connectionId})"); + OnServerConnected.Invoke(connectionId); + }; + + // setup data event + connection.OnData += (message) => + { + // call mirror event + //Debug.Log($"KCP: OnServerDataReceived({connectionId}, {BitConverter.ToString(message.Array, message.Offset, message.Count)})"); + OnServerDataReceived.Invoke(connectionId, message); + }; + + // setup disconnected event + connection.OnDisconnected += () => + { + // flag for removal + // (can't remove directly because connection is updated + // and event is called while iterating all connections) + connectionsToRemove.Add(connectionId); + + // call mirror event + Debug.Log($"KCP: OnServerDisconnected({connectionId})"); + OnServerDisconnected.Invoke(connectionId); + }; + + // send handshake + connection.Handshake(); + } + + connection.RawInput(buffer, msgLength); + } + + // tick all server connections + foreach (KcpServerConnection connection in connections.Values) + { + connection.Tick(); + connection.Receive(); + } + + // remove disconnected connections + // (can't do it in connection.OnDisconnected because Tick is called + // while iterating connections) + foreach (int connectionId in connectionsToRemove) + { + connections.Remove(connectionId); + } + connectionsToRemove.Clear(); + } + + void UpdateClient() + { + // tick client connection + if (clientConnection != null) + { + clientConnection.Tick(); + // recv on socket + clientConnection.RawReceive(); + // recv on kcp + clientConnection.Receive(); + } + } + + // IMPORTANT: set script execution order to >1000 to call Transport's + // LateUpdate after all others. Fixes race condition where + // e.g. in uSurvival Transport would apply Cmds before + // ShoulderRotation.LateUpdate, resulting in projectile + // spawns at the point before shoulder rotation. + public void LateUpdate() + { + // note: we need to check enabled in case we set it to false + // when LateUpdate already started. + // (https://github.com/vis2k/Mirror/pull/379) + if (!enabled) + return; + + UpdateServer(); + UpdateClient(); + } + + // server + public override bool ServerActive() => serverSocket != null; + public override void ServerStart() + { + // only start once + if (serverSocket != null) + { + Debug.LogWarning("KCP: server already started!"); + } + + // listen + serverSocket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp); + serverSocket.DualMode = true; + serverSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, Port)); + } + public override bool ServerSend(int connectionId, int channelId, ArraySegment segment) + { + if (connections.TryGetValue(connectionId, out KcpServerConnection connection)) + { + connection.Send(segment); + return true; + } + return false; + } + public override bool ServerDisconnect(int connectionId) + { + if (connections.TryGetValue(connectionId, out KcpServerConnection connection)) + { + connection.Disconnect(); + return true; + } + return false; + } + public override string ServerGetClientAddress(int connectionId) + { + if (connections.TryGetValue(connectionId, out KcpServerConnection connection)) + { + return (connection.GetRemoteEndPoint() as IPEndPoint).Address.ToString(); + } + return ""; + } + public override void ServerStop() + { + serverSocket?.Close(); + serverSocket = null; + } + + // common + public override void Shutdown() {} + + // MTU + public override ushort GetMaxPacketSize() => Kcp.MTU_DEF; + + public override string ToString() + { + return "KCP"; + } + + int GetTotalSendQueue() => + connections.Values.Sum(conn => conn.kcp.snd_queue.Count); + int GetTotalReceiveQueue() => + connections.Values.Sum(conn => conn.kcp.rcv_queue.Count); + int GetTotalSendBuffer() => + connections.Values.Sum(conn => conn.kcp.snd_buf.Count); + int GetTotalReceiveBuffer() => + connections.Values.Sum(conn => conn.kcp.rcv_buf.Count); + + void OnGUI() + { + GUILayout.BeginArea(new Rect(5, 100, 300, 300)); + + if (ServerActive()) + { + GUILayout.BeginVertical("Box"); + GUILayout.Label("SERVER"); + GUILayout.Label(" SendQueue: " + GetTotalSendQueue()); + GUILayout.Label(" ReceiveQueue: " + GetTotalReceiveQueue()); + GUILayout.Label(" SendBuffer: " + GetTotalSendBuffer()); + GUILayout.Label(" ReceiveBuffer: " + GetTotalReceiveBuffer()); + GUILayout.EndVertical(); + } + + if (ClientConnected()) + { + GUILayout.BeginVertical("Box"); + GUILayout.Label("CLIENT"); + GUILayout.Label(" SendQueue: " + clientConnection.kcp.snd_queue.Count); + GUILayout.Label(" ReceiveQueue: " + clientConnection.kcp.rcv_queue.Count); + GUILayout.Label(" SendBuffer: " + clientConnection.kcp.snd_buf.Count); + GUILayout.Label(" ReceiveBuffer: " + clientConnection.kcp.rcv_buf.Count); + GUILayout.EndVertical(); + } + + GUILayout.EndArea(); + } + } +} +#endif diff --git a/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs.meta new file mode 100644 index 000000000..e87428e77 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/MirrorTransport/KcpTransport.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 6b0fecffa3f624585964b0d0eb21b18e +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k.meta new file mode 100644 index 000000000..f213a815b --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: b6fcfb8bb66ef4cac9bb90b51a0e932f +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE new file mode 100755 index 000000000..c77582e85 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE @@ -0,0 +1,24 @@ +MIT License + +Copyright (c) 2016 limpo1989 +Copyright (c) 2020 Paul Pacheco +Copyright (c) 2020 Lymdun +Copyright (c) 2020 vis2k + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE.meta new file mode 100644 index 000000000..49dc767e9 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/LICENSE.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: 9a3e8369060cf4e94ac117603de47aa6 +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION new file mode 100755 index 000000000..c5c3c51bc --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION @@ -0,0 +1,6 @@ +V1.0 +- Kcp.cs now mirrors original Kcp.c behaviour + (this fixes dozens of bugs) + +V0.1 +- initial kcp-csharp based version \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION.meta new file mode 100644 index 000000000..2a07daa5c --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/VERSION.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: ed3f2cf1bbf1b4d53a6f2c103d311f71 +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel.meta new file mode 100644 index 000000000..ba0483d4f --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 50a0273b27ec14f789a8e3c676e0bff4 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs new file mode 100644 index 000000000..52e433d03 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs @@ -0,0 +1,73 @@ +using UnityEngine; +using System.Net; +using System.Net.Sockets; + +namespace kcp2k +{ + public class KcpClientConnection : KcpConnection + { + readonly byte[] buffer = new byte[1500]; + + /// + /// Client connection, does not share the UDP client with anyone + /// so we can set up our own read loop + /// + /// + /// + public KcpClientConnection() : base() + { + } + + public void Connect(string host, ushort port, bool noDelay, uint interval = Kcp.INTERVAL) + { + Debug.Log($"KcpClient: connect to {host}:{port}"); + IPAddress[] ipAddress = Dns.GetHostAddresses(host); + if (ipAddress.Length < 1) + throw new SocketException((int)SocketError.HostNotFound); + + remoteEndpoint = new IPEndPoint(ipAddress[0], port); + socket = new Socket(remoteEndpoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + socket.Connect(remoteEndpoint); + SetupKcp(noDelay, interval); + + RawReceive(); + + Handshake(); + } + + // TODO call from transport update + public void RawReceive() + { + try + { + if (socket != null) + { + while (socket.Poll(0, SelectMode.SelectRead)) + { + int msgLength = socket.ReceiveFrom(buffer, ref remoteEndpoint); + //Debug.Log($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}"); + RawInput(buffer, msgLength); + } + + // wait a few MS to poll again + //await UniTask.Delay(2); + } + } + catch (SocketException) + { + // this is fine, the socket might have been closed in the other end + } + } + + protected override void Dispose() + { + socket.Close(); + socket = null; + } + + protected override void RawSend(byte[] data, int length) + { + socket.Send(data, length, SocketFlags.None); + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs.meta new file mode 100644 index 000000000..336991847 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpClientConnection.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 96512e74aa8214a6faa8a412a7a07877 +timeCreated: 1602601237 \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs new file mode 100644 index 000000000..f1ea60f84 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs @@ -0,0 +1,282 @@ +using System; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using Unity.Collections.LowLevel.Unsafe; +using Debug = UnityEngine.Debug; + +namespace kcp2k +{ + public abstract class KcpConnection + { + protected Socket socket; + protected EndPoint remoteEndpoint; + internal Kcp kcp; + volatile bool open; + + public event Action OnConnected; + public event Action> OnData; + public event Action OnDisconnected; + + // If we don't receive anything these many milliseconds + // then consider us disconnected + public const int TIMEOUT = 3000; + + // internal time. + // StopWatch offers ElapsedMilliSeconds and should be more precise than + // Unity's time.deltaTime over long periods. + readonly Stopwatch refTime = new Stopwatch(); + + // recv buffer to avoid allocations + byte[] buffer = new byte[Kcp.MTU_DEF]; + + volatile uint lastReceived; + + internal static readonly ArraySegment Hello = new ArraySegment(new byte[] { 0 }); + private static readonly ArraySegment Goodby = new ArraySegment(new byte[] { 1 }); + + // a connection is authenticated after sending the correct handshake. + // useful to protect against random data from the internet. + bool authenticated; + + protected KcpConnection() + { + } + + // NoDelay & interval are the most important configurations. + // let's force require the parameters so we don't forget it anywhere. + protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL) + { + kcp = new Kcp(0, RawSend); + kcp.SetNoDelay(noDelay ? 1u : 0u, interval); + refTime.Start(); + open = true; + + Tick(); + } + + public void Tick() + { + try + { + uint time = (uint)refTime.ElapsedMilliseconds; + + // TODO MirorrNG KCP used to set lastReceived here. but this + // would make the below time check always true. + // should we set lastReceived after updating instead? + //lastReceived = time; + + if (open && time < lastReceived + TIMEOUT) + { + kcp.Update(time); + + // check can be used to skip updates IF: + // - time < what check returned + // - AND send / recv haven't been called in that time + // (see Check() comments) + // + // for now, let's just always update and not call check. + //uint check = kcp.Check(); + } + } + catch (SocketException) + { + // this is ok, the connection was closed + open = false; + } + catch (ObjectDisposedException) + { + // fine, socket was closed, no more ticking needed + open = false; + } + catch (Exception ex) + { + open = false; + Debug.LogException(ex); + } + } + + public void RawInput(byte[] buffer, int msgLength) + { + int input = kcp.Input(buffer, msgLength); + if (input == 0) + { + lastReceived = (uint)refTime.ElapsedMilliseconds; + } + else Debug.LogWarning($"Input failed with error={input} for buffer with length={msgLength}"); + } + + protected abstract void RawSend(byte[] data, int length); + + public void Send(ArraySegment data) + { + // only allow sending up to MaxMessageSize sized messages. + // other end won't process bigger messages anyway. + if (data.Count <= Kcp.MTU_DEF) + { + int sent = kcp.Send(data.Array, data.Offset, data.Count); + if (sent < 0) + { + Debug.LogWarning($"Send failed with error={sent} for segment with length={data.Count}"); + } + } + else Debug.LogError($"Failed to send message of size {data.Count} because it's larger than MaxMessageSize={Kcp.MTU_DEF}"); + } + + protected virtual void Dispose() + { + } + + // ArraySegment content comparison without Linq + public static unsafe bool SegmentsEqual(ArraySegment a, ArraySegment b) + { + if (a.Count == b.Count) + { + fixed (byte* aPtr = &a.Array[a.Offset], + bPtr = &b.Array[b.Offset]) + { + return UnsafeUtility.MemCmp(aPtr, bPtr, a.Count) == 0; + } + } + return false; + } + + /// + /// reads a message from connection + /// + /// buffer where the message will be written + /// true if we got a message, false if we got disconnected + public void Receive() + { + if (!open) + { + OnDisconnected?.Invoke(); + Debug.LogWarning("DISCO a"); + return; + } + + // read as long as we have things to read + int msgSize; + while ((msgSize = kcp.PeekSize()) > 0) + { + // only allow receiving up to MaxMessageSize sized messages. + // otherwise we would get BlockCopy ArgumentException anyway. + if (msgSize <= Kcp.MTU_DEF) + { + int received = kcp.Receive(buffer, msgSize); + if (received >= 0) + { + ArraySegment dataSegment = new ArraySegment(buffer, 0, msgSize); + + // not authenticated yet? + if (!authenticated) + { + // handshake message? + if (SegmentsEqual(dataSegment, Hello)) + { + // we are only connected if we received the handshake. + // not just after receiving any first data. + authenticated = true; + //Debug.Log("KCP: received handshake"); + OnConnected?.Invoke(); + } + // otherwise it's random data from the internet, not + // from a legitimate player. + else + { + Debug.LogWarning("KCP: received random data before handshake. Disconnecting the connection."); + open = false; + OnDisconnected?.Invoke(); + break; + } + } + // authenticated. + else + { + // disconnect message? + if (SegmentsEqual(dataSegment, Goodby)) + { + // if we receive a disconnect message, then close everything + //Debug.Log("KCP: received disconnect message"); + open = false; + OnDisconnected?.Invoke(); + break; + } + // otherwise regular message + else + { + // only accept regular messages + //Debug.LogWarning($"Kcp recv msg: {BitConverter.ToString(buffer, 0, msgSize)}"); + OnData?.Invoke(dataSegment); + } + } + } + else + { + // if receive failed, close everything + Debug.LogWarning($"Receive failed with error={received}. closing connection."); + open = false; + OnDisconnected?.Invoke(); + break; + } + } + // we don't allow sending messages > Max, so this must be an + // attacker. let's disconnect to avoid allocation attacks etc. + else + { + Debug.LogWarning($"KCP: possible allocation attack for msgSize {msgSize} > max {Kcp.MTU_DEF}. Disconnecting the connection."); + open = false; + OnDisconnected?.Invoke(); + break; + } + } + } + + public void Handshake() + { + // send a greeting and see if the server replies + Debug.LogWarning("KcpConnection: sending Handshake to other end!"); + Send(Hello); + } + + /// + /// Disconnect this connection + /// + public virtual void Disconnect() + { + // send a disconnect message and disconnect + if (open && socket.Connected) + { + try + { + Send(Goodby); + kcp.Flush(); + + // call OnDisconnected event, even if we manually + // disconnected + OnDisconnected?.Invoke(); + } + catch (SocketException) + { + // this is ok, the connection was already closed + } + catch (ObjectDisposedException) + { + // this is normal when we stop the server + // the socket is stopped so we can't send anything anymore + // to the clients + + // the clients will eventually timeout and realize they + // were disconnected + } + } + open = false; + + // EOF is now available + //dataAvailable?.TrySetResult(); + } + + // get remote endpoint + public EndPoint GetRemoteEndPoint() => remoteEndpoint; + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs.meta new file mode 100644 index 000000000..fa5dcff95 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpConnection.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 3915c7c62b72d4dc2a9e4e76c94fc484 +timeCreated: 1602600432 \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs new file mode 100644 index 000000000..b7b7ef019 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs @@ -0,0 +1,20 @@ +using System.Net; +using System.Net.Sockets; + +namespace kcp2k +{ + public class KcpServerConnection : KcpConnection + { + public KcpServerConnection(Socket socket, EndPoint remoteEndpoint, bool noDelay, uint interval = Kcp.INTERVAL) + { + this.socket = socket; + this.remoteEndpoint = remoteEndpoint; + SetupKcp(noDelay, interval); + } + + protected override void RawSend(byte[] data, int length) + { + socket.SendTo(data, 0, length, SocketFlags.None, remoteEndpoint); + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs.meta new file mode 100644 index 000000000..10d9803d8 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/highlevel/KcpServerConnection.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 80a9b1ce9a6f14abeb32bfa9921d097b +timeCreated: 1602601483 \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp.meta new file mode 100644 index 000000000..d91e774e9 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: da30be07fda954844b621fec7c727b6a +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs new file mode 100644 index 000000000..5fe5547e1 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("kcp2k.Tests")] \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs.meta new file mode 100644 index 000000000..6b442a944 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/AssemblyInfo.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: aec6a15ac7bd43129317ea1f01f19782 +timeCreated: 1602665988 \ No newline at end of file diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs new file mode 100755 index 000000000..b658c641a --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs @@ -0,0 +1,36 @@ +// byte[] buffer with Position, resizes automatically. +// There is no size limit because we will only use it with ~MTU sized arrays. +using System; +using System.Runtime.CompilerServices; + +namespace kcp2k +{ + public class ByteBuffer + { + public int Position; + internal const int InitialCapacity = 1200; + public byte[] RawBuffer = new byte[InitialCapacity]; + + // resize to 'value' capacity if needed + [MethodImpl(MethodImplOptions.AggressiveInlining)] + void EnsureCapacity(int value) + { + if (RawBuffer.Length < value) + { + int capacity = Math.Max(value, RawBuffer.Length * 2); + Array.Resize(ref RawBuffer, capacity); + } + } + + // write bytes at offset + public void WriteBytes(byte[] bytes, int offset, int count) + { + if (offset >= 0 && count > 0) + { + EnsureCapacity(Position + count); + Buffer.BlockCopy(bytes, offset, RawBuffer, Position, count); + Position += count; + } + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs.meta new file mode 100755 index 000000000..e66166bb4 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/ByteBuffer.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a9e4197a65c254ec0bf52ae9be1b340b +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs new file mode 100755 index 000000000..f821c1659 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs @@ -0,0 +1,1021 @@ +// Kcp based on https://github.com/skywind3000/kcp +// Kept as close to original as possible. +using System; +using System.Collections.Generic; +using UnityEngine; + +namespace kcp2k +{ + public class Kcp + { + // original Kcp has a define option, which is not defined by default: + // #define FASTACK_CONSERVE + + public const int RTO_NDL = 30; // no delay min rto + public const int RTO_MIN = 100; // normal min rto + public const int RTO_DEF = 200; // default RTO + public const int RTO_MAX = 60000; // maximum RTO + public const int CMD_PUSH = 81; // cmd: push data + public const int CMD_ACK = 82; // cmd: ack + public const int CMD_WASK = 83; // cmd: window probe (ask) + public const int CMD_WINS = 84; // cmd: window size (tell) + public const int ASK_SEND = 1; // need to send CMD_WASK + public const int ASK_TELL = 2; // need to send CMD_WINS + public const int WND_SND = 32; // default send window + public const int WND_RCV = 128; // default receive window. must be >= max fragment size + public const int MTU_DEF = 1200; // default MTU (reduced to 1200 to fit all cases: https://en.wikipedia.org/wiki/Maximum_transmission_unit ; steam uses 1200 too!) + public const int ACK_FAST = 3; + public const int INTERVAL = 100; + public const int OVERHEAD = 24; + public const int DEADLINK = 20; + public const int THRESH_INIT = 2; + public const int THRESH_MIN = 2; + public const int PROBE_INIT = 7000; // 7 secs to probe window size + public const int PROBE_LIMIT = 120000; // up to 120 secs to probe window + public const int FASTACK_LIMIT = 5; // max times to trigger fastack + + internal struct AckItem + { + internal uint serialNumber; + internal uint timestamp; + } + + // kcp members. + int state; + readonly uint conv; // conversation + internal uint mtu; + internal uint mss; // maximum segment size + internal uint snd_una; // unacknowledged + internal uint snd_nxt; + internal uint rcv_nxt; + internal uint ssthresh; // slow start threshold + internal int rx_rttval; + internal int rx_srtt; // smoothed round trip time + internal int rx_rto; + internal int rx_minrto; + internal uint snd_wnd; // send window + internal uint rcv_wnd; // receive window + internal uint rmt_wnd; // remote window + internal uint cwnd; // congestion window + internal uint probe; + internal uint interval; + internal uint ts_flush; + internal uint xmit; + internal uint nodelay; // not a bool. original Kcp has '<2 else' check. + internal bool updated; + internal uint ts_probe; // timestamp probe + internal uint probe_wait; + internal uint dead_link; + internal uint incr; + internal uint current; // current time (milliseconds). set by Update. + + internal int fastresend; + internal int fastlimit; + internal bool nocwnd; + internal readonly Queue snd_queue = new Queue(16); // send queue + internal readonly Queue rcv_queue = new Queue(16); // receive queue + // snd_buffer needs index removals. + // C# LinkedList allocates for each entry, so let's keep List for now. + internal readonly List snd_buf = new List(16); // send buffer + // rcv_buffer needs index insertions and backwards iteration. + // C# LinkedList allocates for each entry, so let's keep List for now. + internal readonly List rcv_buf = new List(16); // receive buffer + internal readonly List acklist = new List(16); + + internal byte[] buffer; + readonly Action output; // buffer, size + + // get how many packet is waiting to be sent + public int WaitSnd => snd_buf.Count + snd_queue.Count; + + // ikcp_create + // create a new kcp control object, 'conv' must equal in two endpoint + // from the same connection. + public Kcp(uint conv, Action output) + { + this.conv = conv; + this.output = output; + snd_wnd = WND_SND; + rcv_wnd = WND_RCV; + rmt_wnd = WND_RCV; + mtu = MTU_DEF; + mss = mtu - OVERHEAD; + rx_rto = RTO_DEF; + rx_minrto = RTO_MIN; + interval = INTERVAL; + ts_flush = INTERVAL; + ssthresh = THRESH_INIT; + fastlimit = FASTACK_LIMIT; + dead_link = DEADLINK; + buffer = new byte[(mtu + OVERHEAD) * 3]; + } + + // ikcp_segment_new + // we keep the original function and add our pooling to it. + // this way we'll never miss it anywhere. + static Segment SegmentNew() + { + return Segment.Take(); + } + + // ikcp_segment_delete + // we keep the original function and add our pooling to it. + // this way we'll never miss it anywhere. + static void SegmentDelete(Segment seg) + { + Segment.Return(seg); + } + + // ikcp_recv + // receive data from kcp state machine + // returns number of bytes read. + // returns negative on error. + // note: pass negative length to peek. + public int Receive(byte[] buffer, int len) + { + // kcp's ispeek feature is not supported. + // this makes 'merge fragment' code significantly easier because + // we can iterate while queue.Count > 0 and dequeue each time. + // if we had to consider ispeek then count would always be > 0 and + // we would have to remove only after the loop. + // + //bool ispeek = len < 0; + if (len < 0) + throw new NotSupportedException("Receive ispeek for negative len is not supported!"); + + if (rcv_queue.Count == 0) + return -1; + + if (len < 0) len = -len; + + int peeksize = PeekSize(); + + if (peeksize < 0) + return -2; + + if (peeksize > len) + return -3; + + bool recover = rcv_queue.Count >= rcv_wnd; + + // merge fragment. + int offset = 0; + len = 0; + // original KCP iterates rcv_queue and deletes if !ispeek. + // removing from a c# queue while iterating is not possible, but + // we can change to 'while Count > 0' and remove every time. + // (we can remove every time because we removed ispeek support!) + while (rcv_queue.Count > 0) + { + // unlike original kcp, we dequeue instead of just getting the + // entry. this is fine because we remove it in ANY case. + Segment seg = rcv_queue.Dequeue(); + + Buffer.BlockCopy(seg.data.RawBuffer, 0, buffer, offset, seg.data.Position); + offset += seg.data.Position; + + len += seg.data.Position; + uint fragment = seg.frg; + + // note: ispeek is not supported in order to simplify this loop + + // unlike original kcp, we don't need to remove seg from queue + // because we already dequeued it. + // simply delete it + SegmentDelete(seg); + + if (fragment == 0) + break; + } + + // move available data from rcv_buf -> rcv_queue + int removed = 0; + foreach (Segment seg in rcv_buf) + { + if (seg.sn == rcv_nxt && rcv_queue.Count < rcv_wnd) + { + // can't remove while iterating. remember how many to remove + // and do it after the loop. + // note: don't return segment. we only add it to rcv_queue + ++removed; + // add + rcv_queue.Enqueue(seg); + rcv_nxt++; + } + else + { + break; + } + } + rcv_buf.RemoveRange(0, removed); + + // fast recover + if (rcv_queue.Count < rcv_wnd && recover) + { + // ready to send back CMD_WINS in flush + // tell remote my window size + probe |= ASK_TELL; + } + + return len; + } + + // ikcp_peeksize + // check the size of next message in the recv queue + public int PeekSize() + { + int length = 0; + + if (rcv_queue.Count == 0) return -1; + + Segment seq = rcv_queue.Peek(); + if (seq.frg == 0) return seq.data.Position; + + if (rcv_queue.Count < seq.frg + 1) return -1; + + foreach (Segment seg in rcv_queue) + { + length += seg.data.Position; + if (seg.frg == 0) break; + } + + return length; + } + + // ikcp_send + // sends byte[] to the other end. + public int Send(byte[] buffer, int offset, int len) + { + int count; + + if (len < 0) return -1; + + // streaming mode: removed. we never want to send 'hello' and + // receive 'he' 'll' 'o'. we want to always receive 'hello'. + + if (len <= mss) count = 1; + else count = (int)((len + mss - 1) / mss); + + if (count >= WND_RCV) return -2; + + if (count == 0) count = 1; + + // fragment + for (int i = 0; i < count; i++) + { + int size = len > (int)mss ? (int)mss : len; + Segment seg = SegmentNew(); + + if (len > 0) + { + seg.data.WriteBytes(buffer, offset, size); + } + // seg.len = size: WriteBytes sets segment.Position! + seg.frg = (byte)(count - i - 1); + snd_queue.Enqueue(seg); + offset += size; + len -= size; + } + + return 0; + } + + // ikcp_update_ack + void UpdateAck(int rtt) // round trip time + { + // https://tools.ietf.org/html/rfc6298 + if (rx_srtt == 0) + { + rx_srtt = rtt; + rx_rttval = rtt / 2; + } + else + { + int delta = rtt - rx_srtt; + if (delta < 0) delta = -delta; + rx_rttval = (3 * rx_rttval + delta) / 4; + rx_srtt = (7 * rx_srtt + rtt) / 8; + if (rx_srtt < 1) rx_srtt = 1; + } + int rto = rx_srtt + Math.Max((int)interval, 4 * rx_rttval); + rx_rto = Mathf.Clamp(rto, rx_minrto, RTO_MAX); + } + + // ikcp_shrink_buf + internal void ShrinkBuf() + { + if (snd_buf.Count > 0) + { + Segment seg = snd_buf[0]; + snd_una = seg.sn; + } + else + { + snd_una = snd_nxt; + } + } + + // ikcp_parse_ack + // removes the segment with 'sn' from send buffer + internal void ParseAck(uint sn) + { + if (Utils.TimeDiff(sn, snd_una) < 0 || Utils.TimeDiff(sn, snd_nxt) >= 0) + return; + + // for-int so we can erase while iterating + for (int i = 0; i < snd_buf.Count; ++i) + { + Segment seg = snd_buf[i]; + if (sn == seg.sn) + { + snd_buf.RemoveAt(i); + SegmentDelete(seg); + break; + } + if (Utils.TimeDiff(sn, seg.sn) < 0) + { + break; + } + } + } + + // ikcp_parse_una + void ParseUna(uint una) + { + int removed = 0; + foreach (Segment seg in snd_buf) + { + if (Utils.TimeDiff(una, seg.sn) > 0) + { + // can't remove while iterating. remember how many to remove + // and do it after the loop. + ++removed; + SegmentDelete(seg); + } + else + { + break; + } + } + snd_buf.RemoveRange(0, removed); + } + + // ikcp_parse_fastack + void ParseFastack(uint sn, uint ts) + { + if (Utils.TimeDiff(sn, snd_una) < 0 || Utils.TimeDiff(sn, snd_nxt) >= 0) + return; + + foreach (Segment seg in snd_buf) + { + if (Utils.TimeDiff(sn, seg.sn) < 0) + { + break; + } + else if (sn != seg.sn) + { +#if !FASTACK_CONSERVE + seg.fastack++; +#else + if (Utils.TimeDiff(ts, seg.ts) >= 0) + seg.fastack++; +#endif + } + } + } + + // ikcp_ack_push + // appends an ack. + void AckPush(uint sn, uint ts) + { + acklist.Add(new AckItem{ serialNumber = sn, timestamp = ts }); + } + + // ikcp_parse_data + void ParseData(Segment newseg) + { + uint sn = newseg.sn; + + if (Utils.TimeDiff(sn, rcv_nxt + rcv_wnd) >= 0 || + Utils.TimeDiff(sn, rcv_nxt) < 0) + { + SegmentDelete(newseg); + return; + } + + InsertSegmentInReceiveBuffer(newseg); + MoveReceiveBufferDataToReceiveQueue(); + } + + // inserts the segment into rcv_buf, ordered by seg.sn. + // drops the segment if one with the same seg.sn already exists. + // goes through receive buffer in reverse order for performance. + // + // note: see KcpTests.InsertSegmentInReceiveBuffer test! + // note: 'insert or delete' can be done in different ways, but let's + // keep consistency with original C kcp. + internal void InsertSegmentInReceiveBuffer(Segment newseg) + { + bool repeat = false; // 'duplicate' + + // original C iterates backwards, so we need to do that as well. + int i; + for (i = rcv_buf.Count - 1; i >= 0; i--) + { + Segment seg = rcv_buf[i]; + if (seg.sn == newseg.sn) + { + // duplicate segment found. nothing will be added. + repeat = true; + break; + } + if (Utils.TimeDiff(newseg.sn, seg.sn) > 0) + { + // this entry's sn is < newseg.sn, so let's stop + break; + } + } + + // no duplicate? then insert. + if (!repeat) + { + rcv_buf.Insert(i + 1, newseg); + } + // duplicate. just delete it. + else + { + SegmentDelete(newseg); + } + } + + // move available data from rcv_buf -> rcv_queue + void MoveReceiveBufferDataToReceiveQueue() + { + int removed = 0; + foreach (Segment seg in rcv_buf) + { + if (seg.sn == rcv_nxt && rcv_queue.Count < rcv_wnd) + { + // can't remove while iterating. remember how many to remove + // and do it after the loop. + ++removed; + rcv_queue.Enqueue(seg); + rcv_nxt++; + } + else + { + break; + } + } + rcv_buf.RemoveRange(0, removed); + } + + // ikcp_input + /// used when you receive a low level packet (eg. UDP packet) + public int Input(byte[] data, int size) + { + uint prev_una = snd_una; + uint maxack = 0; + uint latest_ts = 0; + int flag = 0; + + if (data == null || size < OVERHEAD) return -1; + + int offset = 0; + + while (true) + { + uint ts = 0; + uint sn = 0; + uint len = 0; + uint una = 0; + uint conv_ = 0; + ushort wnd = 0; + byte cmd = 0; + byte frg = 0; + + if (size < OVERHEAD) break; + + offset += Utils.Decode32U(data, offset, ref conv_); + if (conv_ != conv) return -1; + + offset += Utils.Decode8u(data, offset, ref cmd); + offset += Utils.Decode8u(data, offset, ref frg); + offset += Utils.Decode16U(data, offset, ref wnd); + offset += Utils.Decode32U(data, offset, ref ts); + offset += Utils.Decode32U(data, offset, ref sn); + offset += Utils.Decode32U(data, offset, ref una); + offset += Utils.Decode32U(data, offset, ref len); + + size -= OVERHEAD; + + if (size < len || len < 0) return -2; + + if (cmd != CMD_PUSH && cmd != CMD_ACK && + cmd != CMD_WASK && cmd != CMD_WINS) + return -3; + + rmt_wnd = wnd; + ParseUna(una); + ShrinkBuf(); + + if (cmd == CMD_ACK) + { + if (Utils.TimeDiff(current, ts) >= 0) + { + UpdateAck(Utils.TimeDiff(current, ts)); + } + ParseAck(sn); + ShrinkBuf(); + if (flag == 0) + { + flag = 1; + maxack = sn; + latest_ts = ts; + } + else + { + if (Utils.TimeDiff(sn, maxack) > 0) + { +#if !FASTACK_CONSERVE + maxack = sn; + latest_ts = ts; +#else + if (Utils.TimeDiff(ts, latest_ts) > 0) + { + maxack = sn; + latest_ts = ts; + } +#endif + } + } + } + else if (cmd == CMD_PUSH) + { + if (Utils.TimeDiff(sn, rcv_nxt + rcv_wnd) < 0) + { + AckPush(sn, ts); + if (Utils.TimeDiff(sn, rcv_nxt) >= 0) + { + Segment seg = SegmentNew(); + seg.conv = conv_; + seg.cmd = cmd; + seg.frg = frg; + seg.wnd = wnd; + seg.ts = ts; + seg.sn = sn; + seg.una = una; + if (len > 0) + { + seg.data.WriteBytes(data, offset, (int)len); + } + ParseData(seg); + } + } + } + else if (cmd == CMD_WASK) + { + // ready to send back CMD_WINS in flush + // tell remote my window size + probe |= ASK_TELL; + } + else if (cmd == CMD_WINS) + { + // do nothing + } + else + { + return -3; + } + + offset += (int)len; + size -= (int)len; + } + + if (flag != 0) + { + ParseFastack(maxack, latest_ts); + } + + // cwnd update when packet arrived + if (Utils.TimeDiff(snd_una, prev_una) > 0) + { + if (cwnd < rmt_wnd) + { + if (cwnd < ssthresh) + { + cwnd++; + incr += mss; + } + else + { + if (incr < mss) incr = mss; + incr += (mss * mss) / incr + (mss / 16); + if ((cwnd + 1) * mss <= incr) + { + cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1); + } + } + if (cwnd > rmt_wnd) + { + cwnd = rmt_wnd; + incr = rmt_wnd * mss; + } + } + } + + return 0; + } + + // ikcp_wnd_unused + uint WndUnused() + { + if (rcv_queue.Count < rcv_wnd) + return rcv_wnd - (uint)rcv_queue.Count; + return 0; + } + + // ikcp_flush + // flush remain ack segments + public void Flush() + { + int offset = 0; // buffer ptr in original C + bool lost = false; // lost segments + + // helper functions + void MakeSpace(int space) + { + if (offset + space > mtu) + { + output(buffer, offset); + offset = 0; + } + } + + void FlushBuffer() + { + if (offset > 0) + { + output(buffer, offset); + } + } + + // 'ikcp_update' haven't been called. + if (!updated) return; + + // kcp only stack allocs a segment here for performance, leaving + // its data buffer null because this segment's data buffer is never + // used. that's fine in C, but in C# our segment is class so we need + // to allocate and most importantly, not forget to deallocate it + // before returning. + Segment seg = SegmentNew(); + seg.conv = conv; + seg.cmd = CMD_ACK; + seg.wnd = WndUnused(); + seg.una = rcv_nxt; + + // flush acknowledges + foreach (AckItem ack in acklist) + { + MakeSpace(OVERHEAD); + // ikcp_ack_get assigns ack[i] to seg.sn, seg.ts + seg.sn = ack.serialNumber; + seg.ts = ack.timestamp; + offset += seg.Encode(buffer, offset); + } + + acklist.Clear(); + + // probe window size (if remote window size equals zero) + if (rmt_wnd == 0) + { + if (probe_wait == 0) + { + probe_wait = PROBE_INIT; + ts_probe = current + probe_wait; + } + else + { + if (Utils.TimeDiff(current, ts_probe) >= 0) + { + if (probe_wait < PROBE_INIT) + probe_wait = PROBE_INIT; + probe_wait += probe_wait / 2; + if (probe_wait > PROBE_LIMIT) + probe_wait = PROBE_LIMIT; + ts_probe = current + probe_wait; + probe |= ASK_SEND; + } + } + } + else + { + ts_probe = 0; + probe_wait = 0; + } + + // flush window probing commands + if ((probe & ASK_SEND) != 0) + { + seg.cmd = CMD_WASK; + MakeSpace(OVERHEAD); + offset += seg.Encode(buffer, offset); + } + + // flush window probing commands + if ((probe & ASK_TELL) != 0) + { + seg.cmd = CMD_WINS; + MakeSpace(OVERHEAD); + offset += seg.Encode(buffer, offset); + } + + probe = 0; + + // calculate window size + uint cwnd_ = Math.Min(snd_wnd, rmt_wnd); + if (!nocwnd) cwnd_ = Math.Min(cwnd, cwnd_); + + // move data from snd_queue to snd_buf + // sliding window, controlled by snd_nxt && sna_una+cwnd + while (Utils.TimeDiff(snd_nxt, snd_una + cwnd_) < 0) + { + if (snd_queue.Count == 0) break; + + Segment newseg = snd_queue.Dequeue(); + + newseg.conv = conv; + newseg.cmd = CMD_PUSH; + newseg.wnd = seg.wnd; + newseg.ts = current; + newseg.sn = snd_nxt++; + newseg.una = rcv_nxt; + newseg.resendts = current; + newseg.rto = rx_rto; + newseg.fastack = 0; + newseg.xmit = 0; + snd_buf.Add(newseg); + } + + // calculate resent + uint resent = fastresend > 0 ? (uint)fastresend : 0xffffffff; + uint rtomin = nodelay == 0 ? (uint)rx_rto >> 3 : 0; + + // flush data segments + int change = 0; + foreach (Segment segment in snd_buf) + { + bool needsend = false; + // initial transmit + if (segment.xmit == 0) + { + needsend = true; + segment.xmit++; + segment.rto = rx_rto; + segment.resendts = current + (uint)segment.rto + rtomin; + } + // RTO + else if (Utils.TimeDiff(current, segment.resendts) >= 0) + { + needsend = true; + segment.xmit++; + xmit++; + if (nodelay == 0) + { + segment.rto += Math.Max(segment.rto, rx_rto); + } + else + { + int step = (nodelay < 2) ? segment.rto : rx_rto; + segment.rto += step / 2; + } + segment.resendts = current + (uint)segment.rto; + lost = true; + } + // fast retransmit + else if (segment.fastack >= resent) + { + if (segment.xmit <= fastlimit || fastlimit <= 0) + { + needsend = true; + segment.xmit++; + segment.fastack = 0; + segment.resendts = current + (uint)segment.rto; + change++; + } + } + + if (needsend) + { + segment.ts = current; + segment.wnd = seg.wnd; + segment.una = rcv_nxt; + + int need = OVERHEAD + segment.data.Position; + MakeSpace(need); + + offset += segment.Encode(buffer, offset); + + if (segment.data.Position > 0) + { + Buffer.BlockCopy(segment.data.RawBuffer, 0, buffer, offset, segment.data.Position); + offset += segment.data.Position; + } + + if (segment.xmit >= dead_link) + { + state = -1; + } + } + } + + // kcp stackallocs 'seg'. our C# segment is a class though, so we + // need to properly delete and return it to the pool now that we are + // done with it. + SegmentDelete(seg); + + // flash remain segments + FlushBuffer(); + + // update ssthresh + // rate halving, https://tools.ietf.org/html/rfc6937 + if (change > 0) + { + uint inflight = snd_nxt - snd_una; + ssthresh = inflight / 2; + if (ssthresh < THRESH_MIN) + ssthresh = THRESH_MIN; + cwnd = ssthresh + resent; + incr = cwnd * mss; + } + + // congestion control, https://tools.ietf.org/html/rfc5681 + if (lost) + { + // original C uses 'cwnd', not kcp->cwnd! + ssthresh = cwnd_ / 2; + if (ssthresh < THRESH_MIN) + ssthresh = THRESH_MIN; + cwnd = 1; + incr = mss; + } + + if (cwnd < 1) + { + cwnd = 1; + incr = mss; + } + } + + // ikcp_update + // update state (call it repeatedly, every 10ms-100ms), or you can ask + // Check() when to call it again (without Input/Send calling). + // + // 'current' - current timestamp in millisec. pass it to Kcp so that + // Kcp doesn't have to do any stopwatch/deltaTime/etc. code + public void Update(uint currentTimeMilliSeconds) + { + current = currentTimeMilliSeconds; + + if (!updated) + { + updated = true; + ts_flush = current; + } + + int slap = Utils.TimeDiff(current, ts_flush); + + if (slap >= 10000 || slap < -10000) + { + ts_flush = current; + slap = 0; + } + + if (slap >= 0) + { + ts_flush += interval; + if (Utils.TimeDiff(current, ts_flush) >= 0) + { + ts_flush = current + interval; + } + Flush(); + } + } + + // ikcp_check + // Determine when should you invoke update + // Returns when you should invoke update in millisec, if there is no + // input/send calling. you can call update in that time, instead of + // call update repeatly. + // + // Important to reduce unnecessary update invoking. use it to schedule + // update (eg. implementing an epoll-like mechanism, or optimize update + // when handling massive kcp connections). + public uint Check(uint current_) + { + uint ts_flush_ = ts_flush; + int tm_flush = 0x7fffffff; + int tm_packet = 0x7fffffff; + + if (!updated) + { + return current_; + } + + if (Utils.TimeDiff(current_, ts_flush_) >= 10000 || + Utils.TimeDiff(current_, ts_flush_) < -10000) + { + ts_flush_ = current_; + } + + if (Utils.TimeDiff(current_, ts_flush_) >= 0) + { + return current_; + } + + tm_flush = Utils.TimeDiff(ts_flush_, current_); + + foreach (Segment seg in snd_buf) + { + int diff = Utils.TimeDiff(seg.resendts, current_); + if (diff <= 0) + { + return current_; + } + if (diff < tm_packet) tm_packet = diff; + } + + uint minimal = (uint)(tm_packet < tm_flush ? tm_packet : tm_flush); + if (minimal >= interval) minimal = interval; + + return current_ + minimal; + } + + // ikcp_setmtu + // Change MTU (Maximum Transmission Unit) size. + public void SetMtu(uint mtu) + { + if (mtu < 50 || mtu < OVERHEAD) + throw new ArgumentException("MTU must be higher than 50 and higher than OVERHEAD"); + + buffer = new byte[(mtu + OVERHEAD) * 3]; + this.mtu = mtu; + mss = mtu - OVERHEAD; + } + + // ikcp_interval + public void SetInterval(uint interval) + { + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + this.interval = interval; + } + + // ikcp_nodelay + // Normal: false, 40, 0, 0 + // Fast: false, 30, 2, 1 + // Fast2: true, 20, 2, 1 + // Fast3: true, 10, 2, 1 + public void SetNoDelay(uint nodelay, uint interval = INTERVAL, int resend = 0, bool nocwnd = false) + { + this.nodelay = nodelay; + if (nodelay != 0) + { + rx_minrto = RTO_NDL; + } + else + { + rx_minrto = RTO_MIN; + } + + if (interval >= 0) + { + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + this.interval = interval; + } + + if (resend >= 0) + { + fastresend = resend; + } + + this.nocwnd = nocwnd; + } + + // ikcp_wndsize + public void SetWindowSize(uint sendWindow, uint receiveWindow) + { + if (sendWindow > 0) + { + snd_wnd = sendWindow; + } + + if (receiveWindow > 0) + { + // must >= max fragment size + rcv_wnd = Math.Max(receiveWindow, WND_RCV); + } + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs.meta new file mode 100755 index 000000000..935b4239c --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Kcp.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a59b1cae10a334faf807432ab472f212 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs new file mode 100755 index 000000000..1fcd6339a --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs @@ -0,0 +1,84 @@ +using System.Collections.Generic; + +namespace kcp2k +{ + // KCP Segment Definition + internal class Segment + { + internal uint conv; // conversation + internal uint cmd; // command, e.g. Kcp.CMD_ACK etc. + internal uint frg; // fragment + internal uint wnd; // window + internal uint ts; // timestamp + internal uint sn; // serial number + internal uint una; + internal uint resendts; // resend timestamp + internal int rto; + internal uint fastack; + internal uint xmit; + internal ByteBuffer data; + + // pool //////////////////////////////////////////////////////////////// + internal static readonly Stack Pool = new Stack(32); + + public static Segment Take() + { + if (Pool.Count > 0) + { + Segment seg = Pool.Pop(); + return seg; + } + return new Segment(); + } + + public static void Return(Segment seg) + { + seg.Reset(); + Pool.Push(seg); + } + //////////////////////////////////////////////////////////////////////// + internal Segment() + { + // allocate the ByteBuffer once. + // note that we don't need to pool ByteBuffer, because Segment is + // already pooled. + data = new ByteBuffer(); + } + + // ikcp_encode_seg + // encode a segment into buffer + internal int Encode(byte[] ptr, int offset) + { + int offset_ = offset; + offset += Utils.Encode32U(ptr, offset, conv); + offset += Utils.Encode8u(ptr, offset, (byte)cmd); + offset += Utils.Encode8u(ptr, offset, (byte)frg); + offset += Utils.Encode16U(ptr, offset, (ushort)wnd); + offset += Utils.Encode32U(ptr, offset, ts); + offset += Utils.Encode32U(ptr, offset, sn); + offset += Utils.Encode32U(ptr, offset, una); + offset += Utils.Encode32U(ptr, offset, (uint)data.Position); + + return offset - offset_; + } + + // reset to return a fresh segment to the pool + internal void Reset() + { + conv = 0; + cmd = 0; + frg = 0; + wnd = 0; + ts = 0; + sn = 0; + una = 0; + rto = 0; + xmit = 0; + resendts = 0; + fastack = 0; + + // keep buffer for next pool usage, but reset position + data.Position = 0; + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs.meta new file mode 100755 index 000000000..d14dc1a1a --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Segment.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: fc58706a05dd3442c8fde858d5266855 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs new file mode 100755 index 000000000..27331c7b7 --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs @@ -0,0 +1,68 @@ +using System.Runtime.CompilerServices; + +namespace kcp2k +{ + public static class Utils + { + // encode 8 bits unsigned int + public static int Encode8u(byte[] p, int offset, byte c) + { + p[0 + offset] = c; + return 1; + } + + // decode 8 bits unsigned int + public static int Decode8u(byte[] p, int offset, ref byte c) + { + c = p[0 + offset]; + return 1; + } + + /* encode 16 bits unsigned int (lsb) */ + public static int Encode16U(byte[] p, int offset, ushort w) + { + p[0 + offset] = (byte)(w >> 0); + p[1 + offset] = (byte)(w >> 8); + return 2; + } + + /* decode 16 bits unsigned int (lsb) */ + public static int Decode16U(byte[] p, int offset, ref ushort c) + { + ushort result = 0; + result |= p[0 + offset]; + result |= (ushort)(p[1 + offset] << 8); + c = result; + return 2; + } + + /* encode 32 bits unsigned int (lsb) */ + public static int Encode32U(byte[] p, int offset, uint l) + { + p[0 + offset] = (byte)(l >> 0); + p[1 + offset] = (byte)(l >> 8); + p[2 + offset] = (byte)(l >> 16); + p[3 + offset] = (byte)(l >> 24); + return 4; + } + + /* decode 32 bits unsigned int (lsb) */ + public static int Decode32U(byte[] p, int offset, ref uint c) + { + uint result = 0; + result |= p[0 + offset]; + result |= (uint)(p[1 + offset] << 8); + result |= (uint)(p[2 + offset] << 16); + result |= (uint)(p[3 + offset] << 24); + c = result; + return 4; + } + + // timediff was a macro in original Kcp. let's inline it if possible. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int TimeDiff(uint later, uint earlier) + { + return (int)(later - earlier); + } + } +} diff --git a/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs.meta b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs.meta new file mode 100755 index 000000000..86118bc9f --- /dev/null +++ b/Assets/Mirror/Runtime/Transport/KCP/kcp2k/kcp/Utils.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ef959eb716205bd48b050f010a9a35ae +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: