kcp2k V1.22

This commit is contained in:
vis2k 2022-11-24 18:25:23 +01:00
parent 563f6feea0
commit f0aae71f16
5 changed files with 53 additions and 33 deletions

View File

@ -1,3 +1,6 @@
V1.22 [2022-11-24]
- KcpPeer.RawInput: add offset parameter
V1.21 [2022-11-24]
- high level refactor, part one.
- KcpPeer instead of KcpConnection, KcpClientConnection, KcpServerConnection

View File

@ -145,7 +145,7 @@ protected virtual void RawReceive()
int msgLength = socket.Receive(rawReceiveBuffer);
//Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
peer.RawInput(rawReceiveBuffer, msgLength);
peer.RawInput(rawReceiveBuffer, 0, msgLength);
}
}
// this is fine, the socket might have been closed in the other end

View File

@ -490,22 +490,25 @@ public void TickOutgoing()
}
}
public void RawInput(byte[] buffer, int msgLength)
// insert raw IO. usually from socket.Receive.
// offset is useful for relays, where we may parse a header and then
// feed the rest to kcp.
public void RawInput(byte[] buffer, int offset, int size)
{
// parse channel
if (msgLength > 0)
if (size > 0)
{
byte channel = buffer[0];
byte channel = buffer[offset + 0];
switch (channel)
{
case (byte)KcpChannel.Reliable:
{
// input into kcp, but skip channel byte
int input = kcp.Input(buffer, 1, msgLength - 1);
int input = kcp.Input(buffer, offset + 1, size - 1);
if (input != 0)
{
// GetType() shows Server/ClientConn instead of just Connection.
Log.Warning($"{GetType()}: Input failed with error={input} for buffer with length={msgLength - 1}");
Log.Warning($"{GetType()}: Input failed with error={input} for buffer with length={size - 1}");
}
break;
}
@ -532,7 +535,7 @@ public void RawInput(byte[] buffer, int msgLength)
// the current state allows it.
if (state == KcpState.Authenticated)
{
ArraySegment<byte> message = new ArraySegment<byte>(buffer, 1, msgLength - 1);
ArraySegment<byte> message = new ArraySegment<byte>(buffer, offset + 1, size - 1);
OnData?.Invoke(message, KcpChannel.Unreliable);
// set last receive time to avoid timeout.

View File

@ -103,9 +103,9 @@ public KcpServer(Action<int> OnConnected,
: new IPEndPoint(IPAddress.Any, 0);
}
public bool IsActive() => socket != null;
public virtual bool IsActive() => socket != null;
public void Start(ushort port)
public virtual void Start(ushort port)
{
// only start once
if (socket != null)
@ -166,19 +166,28 @@ public IPEndPoint GetClientEndPoint(int connectionId)
return null;
}
// io - poll.
// return true if there is data to read.
// after which RawReceive will be called.
// virtual because for relays,
protected virtual bool RawPoll() =>
socket != null && socket.Poll(0, SelectMode.SelectRead);
// io - input.
// virtual so it may be modified for relays, nonalloc workaround, etc.
// https://github.com/vis2k/where-allocation
protected virtual int RawReceive(byte[] buffer, out int connectionHash)
// bool return because not all receives may be valid.
// for example, relay may expect a certain header.
protected virtual bool RawReceive(byte[] buffer, out int size, out int connectionId)
{
// NOTE: ReceiveFrom allocates.
// we pass our IPEndPoint to ReceiveFrom.
// receive from calls newClientEP.Create(socketAddr).
// IPEndPoint.Create always returns a new IPEndPoint.
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761
int read = socket.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref newClientEP);
size = socket.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref newClientEP);
// calculate connectionHash from endpoint
// set connectionId to hash from endpoint
// NOTE: IPEndPoint.GetHashCode() allocates.
// it calls m_Address.GetHashCode().
// m_Address is an IPAddress.
@ -187,23 +196,24 @@ protected virtual int RawReceive(byte[] buffer, out int connectionHash)
//
// => using only newClientEP.Port wouldn't work, because
// different connections can have the same port.
connectionHash = newClientEP.GetHashCode();
return read;
connectionId = newClientEP.GetHashCode();
return true;
}
// io - out.
// virtual so it may be modified for relays, nonalloc workaround, etc.
protected virtual void RawSend(ArraySegment<byte> data, EndPoint remoteEndPoint)
// relays may need to prefix connId (and remoteEndPoint would be same for all)
protected virtual void RawSend(int connectionId, ArraySegment<byte> data, EndPoint remoteEndPoint)
{
socket.SendTo(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint);
}
protected virtual KcpServerConnection CreateConnection()
protected virtual KcpServerConnection CreateConnection(int connectionId)
{
// attach EndPoint EP to RawSend.
// kcp needs a simple RawSend(byte[]) function.
Action<ArraySegment<byte>> RawSendWrap =
data => RawSend(data, newClientEP);
data => RawSend(connectionId, data, newClientEP);
KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
return new KcpServerConnection(peer, newClientEP);
@ -219,7 +229,9 @@ void ProcessNext()
// returns amount of bytes written into buffer.
// throws SocketException if datagram was larger than buffer.
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0
int msgLength = RawReceive(rawReceiveBuffer, out int connectionId);
if (!RawReceive(rawReceiveBuffer, out int size, out int connectionId))
return;
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
// is this a new connection?
@ -227,7 +239,7 @@ void ProcessNext()
{
// create a new KcpConnection based on last received
// EndPoint. can be overwritten for where-allocation.
connection = CreateConnection();
connection = CreateConnection(connectionId);
// DO NOT add to connections yet. only if the first message
// is actually the kcp handshake. otherwise it's either:
@ -301,7 +313,7 @@ void ProcessNext()
// connected event was set up.
// tick will process the first message and adds the
// connection if it was the handshake.
connection.peer.RawInput(rawReceiveBuffer, msgLength);
connection.peer.RawInput(rawReceiveBuffer, 0, size);
connection.peer.TickIncoming();
// again, do not add to connections.
@ -311,7 +323,7 @@ void ProcessNext()
// existing connection: simply input the message into kcp
else
{
connection.peer.RawInput(rawReceiveBuffer, msgLength);
connection.peer.RawInput(rawReceiveBuffer, 0, size);
}
}
// this is fine, the socket might have been closed in the other end
@ -325,10 +337,11 @@ void ProcessNext()
}
// process incoming messages. should be called before updating the world.
// virtual because relay may need to inject their own ping or similar.
HashSet<int> connectionsToRemove = new HashSet<int>();
public void TickIncoming()
public virtual void TickIncoming()
{
while (socket != null && socket.Poll(0, SelectMode.SelectRead))
while (RawPoll())
{
ProcessNext();
}
@ -351,7 +364,8 @@ public void TickIncoming()
}
// process outgoing messages. should be called after updating the world.
public void TickOutgoing()
// virtual because relay may need to inject their own ping or similar.
public virtual void TickOutgoing()
{
// flush all server connections
foreach (KcpServerConnection connection in connections.Values)
@ -363,13 +377,13 @@ public void TickOutgoing()
// process incoming and outgoing for convenience.
// => ideally call ProcessIncoming() before updating the world and
// ProcessOutgoing() after updating the world for minimum latency
public void Tick()
public virtual void Tick()
{
TickIncoming();
TickOutgoing();
}
public void Stop()
public virtual void Stop()
{
socket?.Close();
socket = null;

View File

@ -46,25 +46,25 @@ public KcpServerNonAlloc(Action<int> OnConnected,
: new IPEndPointNonAlloc(IPAddress.Any, 0);
}
protected override int RawReceive(byte[] buffer, out int connectionHash)
protected override bool RawReceive(byte[] buffer, out int size, out int connectionId)
{
// where-allocation nonalloc ReceiveFrom.
int read = socket.ReceiveFrom_NonAlloc(buffer, 0, buffer.Length, SocketFlags.None, reusableClientEP);
size = socket.ReceiveFrom_NonAlloc(buffer, 0, buffer.Length, SocketFlags.None, reusableClientEP);
SocketAddress remoteAddress = reusableClientEP.temp;
// where-allocation nonalloc GetHashCode
connectionHash = remoteAddress.GetHashCode();
return read;
connectionId = remoteAddress.GetHashCode();
return true;
}
// make sure to pass IPEndPointNonAlloc as remoteEndPoint
protected override void RawSend(ArraySegment<byte> data, EndPoint remoteEndPoint)
protected override void RawSend(int connectionId, ArraySegment<byte> data, EndPoint remoteEndPoint)
{
// where-allocation nonalloc send
socket.SendTo_NonAlloc(data.Array, data.Offset, data.Count, SocketFlags.None, remoteEndPoint as IPEndPointNonAlloc);
}
protected override KcpServerConnection CreateConnection()
protected override KcpServerConnection CreateConnection(int connectionId)
{
// IPEndPointNonAlloc is reused all the time.
// we can't store that as the connection's endpoint.
@ -78,7 +78,7 @@ protected override KcpServerConnection CreateConnection()
// attach reusable EP to RawSend.
// kcp needs a simple RawSend(byte[]) function.
Action<ArraySegment<byte>> RawSendWrap =
data => RawSend(data, reusableSendEP);
data => RawSend(connectionId, data, reusableSendEP);
KcpPeer peer = new KcpPeer(RawSendWrap, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmits);
return new KcpServerConnection(peer, newClientEP);