feature: Timestamp Batching #2786

This commit is contained in:
vis2k 2021-06-19 22:36:52 +08:00
parent e5771c23a0
commit 820c5a6c44
21 changed files with 346 additions and 167 deletions

View File

@ -4,6 +4,10 @@
//
// IMPORTANT: we use THRESHOLD batching, not MAXED SIZE batching.
// see threshold comments below.
//
// includes timestamp for tick batching.
// -> allows NetworkTransform etc. to use timestamp without including it in
// every single message
using System;
using System.Collections.Generic;
@ -25,6 +29,9 @@ public class Batcher
// they would not contain a timestamp
readonly int threshold;
// TimeStamp header size for those who need it
public const int HeaderSize = sizeof(double);
// batched messages
// IMPORTANT: we queue the serialized messages!
// queueing NetworkMessage would box and allocate!
@ -52,7 +59,7 @@ public void AddMessage(ArraySegment<byte> message)
// batch as many messages as possible into writer
// returns true if any batch was made.
public bool MakeNextBatch(NetworkWriter writer)
public bool MakeNextBatch(NetworkWriter writer, double timeStamp)
{
// if we have no messages then there's nothing to do
if (messages.Count == 0)
@ -62,6 +69,10 @@ public bool MakeNextBatch(NetworkWriter writer)
if (writer.Position != 0)
throw new ArgumentException($"MakeNextBatch needs a fresh writer!");
// write timestamp first
// -> double precision for accuracy over long periods of time
writer.WriteDouble(timeStamp);
// do start no matter what
do
{

View File

@ -1,6 +1,10 @@
// un-batching functionality encapsulated into one class.
// -> less complexity
// -> easy to test
//
// includes timestamp for tick batching.
// -> allows NetworkTransform etc. to use timestamp without including it in
// every single message
using System;
using System.Collections.Generic;
@ -16,21 +20,35 @@ public class Unbatcher
// then pointed to the first batch.
NetworkReader reader = new NetworkReader(new byte[0]);
// timestamp that was written into the batch remotely.
// for the batch that our reader is currently pointed at.
double readerRemoteTimeStamp;
// helper function to start reading a batch.
void StartReadingBatch(PooledNetworkWriter batch)
{
// point reader to it
reader.SetBuffer(batch.ToArraySegment());
// read remote timestamp (double)
// -> AddBatch quarantees that we have at least 8 bytes to read
readerRemoteTimeStamp = reader.ReadDouble();
}
// add a new batch
public void AddBatch(ArraySegment<byte> batch)
// add a new batch.
// returns true if valid.
// returns false if not, in which case the connection should be disconnected.
public bool AddBatch(ArraySegment<byte> batch)
{
// IMPORTANT: ArraySegment is only valid until returning. we copy it!
//
// NOTE: it's not possible to create empty ArraySegments, so we
// don't need to check against that.
// make sure we have at least 8 bytes to read for tick timestamp
if (batch.Count < Batcher.HeaderSize)
return false;
// put into a (pooled) writer
// -> WriteBytes instead of WriteSegment because the latter
// would add a size header. we want to write directly.
@ -45,10 +63,12 @@ public void AddBatch(ArraySegment<byte> batch)
// add batch
batches.Enqueue(writer);
//Debug.Log($"Adding Batch {BitConverter.ToString(batch.Array, batch.Offset, batch.Count)} => batches={batches.Count} reader={reader}");
return true;
}
// get next message, unpacked from batch (if any)
public bool GetNextMessage(out NetworkReader message)
// timestamp is the REMOTE time when the batch was created remotely.
public bool GetNextMessage(out NetworkReader message, out double remoteTimeStamp)
{
// getting messages would be easy via
// <<size, message, size, message, ...>>
@ -71,7 +91,10 @@ public bool GetNextMessage(out NetworkReader message)
// was our reader pointed to anything yet?
if (reader.Length == 0)
{
remoteTimeStamp = 0;
return false;
}
// no more data to read?
if (reader.Remaining == 0)
@ -89,9 +112,17 @@ public bool GetNextMessage(out NetworkReader message)
StartReadingBatch(next);
}
// otherwise there's nothing more to read
else return false;
else
{
remoteTimeStamp = 0;
return false;
}
}
// use the current batch's remote timestamp
// AFTER potentially moving to the next batch ABOVE!
remoteTimeStamp = readerRemoteTimeStamp;
// if we got here, then we have more data to read.
message = reader;
return true;

View File

@ -10,7 +10,7 @@ public class LocalConnectionToClient : NetworkConnectionToClient
{
internal LocalConnectionToServer connectionToServer;
public LocalConnectionToClient() : base(LocalConnectionId, false) {}
public LocalConnectionToClient() : base(LocalConnectionId) {}
public override string address => "localhost";
@ -64,9 +64,6 @@ public class LocalConnectionToServer : NetworkConnectionToServer
internal void QueueConnectedEvent() => connectedEventPending = true;
internal void QueueDisconnectedEvent() => disconnectedEventPending = true;
// parameterless constructor that disables batching for local connections
public LocalConnectionToServer() : base(false) {}
// Send stage two: serialized NetworkMessage as ArraySegment<byte>
internal override void Send(ArraySegment<byte> segment, int channelId = Channels.Reliable)
{
@ -76,8 +73,22 @@ internal override void Send(ArraySegment<byte> segment, int channelId = Channels
return;
}
// handle the server's message directly
NetworkServer.OnTransportData(connectionId, segment, channelId);
// OnTransportData assumes batching.
// so let's make a batch with proper timestamp prefix.
Batcher batcher = GetBatchForChannelId(channelId);
batcher.AddMessage(segment);
// flush it to the server's OnTransportData immediately.
// local connection to server always invokes immediately.
using (PooledNetworkWriter writer = NetworkWriterPool.GetWriter())
{
// make a batch with our local time (double precision)
if (batcher.MakeNextBatch(writer, NetworkTime.localTime))
{
NetworkServer.OnTransportData(connectionId, writer.ToArraySegment(), channelId);
}
else Debug.LogError("Local connection failed to make batch. This should never happen.");
}
}
internal override void Update()
@ -96,9 +107,22 @@ internal override void Update()
{
// call receive on queued writer's content, return to pool
PooledNetworkWriter writer = queue.Dequeue();
ArraySegment<byte> segment = writer.ToArraySegment();
//Debug.Log("Dequeue " + BitConverter.ToString(segment.Array, segment.Offset, segment.Count));
NetworkClient.OnTransportData(segment, Channels.Reliable);
ArraySegment<byte> message = writer.ToArraySegment();
// OnTransportData assumes a proper batch with timestamp etc.
// let's make a proper batch and pass it to OnTransportData.
Batcher batcher = GetBatchForChannelId(Channels.Reliable);
batcher.AddMessage(message);
using (PooledNetworkWriter batchWriter = NetworkWriterPool.GetWriter())
{
// make a batch with our local time (double precision)
if (batcher.MakeNextBatch(batchWriter, NetworkTime.localTime))
{
NetworkClient.OnTransportData(batchWriter.ToArraySegment(), Channels.Reliable);
}
}
NetworkWriterPool.Recycle(writer);
}

View File

@ -14,8 +14,13 @@ public static class MessagePacking
public const int HeaderSize = sizeof(ushort);
// max message content size (without header) calculation for convenience
// -> Transport.GetMaxPacketSize is the raw maximum
// -> Every message gets serialized into <<id, content>>
// -> Every serialized message get put into a batch with a header
public static int MaxContentSize =>
Transport.activeTransport.GetMaxPacketSize() - HeaderSize;
Transport.activeTransport.GetMaxPacketSize()
- HeaderSize
- Batcher.HeaderSize;
public static ushort GetId<T>() where T : struct, NetworkMessage
{

View File

@ -147,7 +147,7 @@ public static void Connect(string address)
connectState = ConnectState.Connecting;
Transport.activeTransport.ClientConnect(address);
connection = new NetworkConnectionToServer(true);
connection = new NetworkConnectionToServer();
}
/// <summary>Connect client to a NetworkServer by Uri.</summary>
@ -163,7 +163,7 @@ public static void Connect(Uri uri)
connectState = ConnectState.Connecting;
Transport.activeTransport.ClientConnect(uri);
connection = new NetworkConnectionToServer(true);
connection = new NetworkConnectionToServer();
}
// TODO why are there two connect host methods?
@ -298,7 +298,12 @@ internal static void OnTransportData(ArraySegment<byte> data, int channelId)
// feed it to the Unbatcher.
// NOTE: we don't need to associate a channelId because we
// always process all messages in the batch.
unbatcher.AddBatch(data);
if (!unbatcher.AddBatch(data))
{
Debug.LogWarning($"NetworkClient: failed to add batch, disconnecting.");
connection.Disconnect();
return;
}
// process all messages in the batch.
// only while NOT loading a scene.
@ -311,11 +316,15 @@ internal static void OnTransportData(ArraySegment<byte> data, int channelId)
// the next time.
// => consider moving processing to NetworkEarlyUpdate.
while (!isLoadingScene &&
unbatcher.GetNextMessage(out NetworkReader reader))
unbatcher.GetNextMessage(out NetworkReader reader, out double remoteTimestamp))
{
// enough to read at least header size?
if (reader.Remaining >= MessagePacking.HeaderSize)
{
// make remoteTimeStamp available to the user
connection.remoteTimeStamp = remoteTimestamp;
// handle message
if (!UnpackAndInvoke(reader, channelId))
break;
}

View File

@ -57,19 +57,23 @@ public abstract class NetworkConnection
// Dictionary<channelId, batch> because we have multiple channels.
protected Dictionary<int, Batcher> batches = new Dictionary<int, Batcher>();
// batch messages and send them out in LateUpdate (or after batchInterval)
protected bool batching;
/// <summary>last batch's remote timestamp. not interpolated. useful for NetworkTransform etc.</summary>
// for any given NetworkMessage/Rpc/Cmd/OnSerialize, this was the time
// on the REMOTE END when it was sent.
//
// NOTE: this is NOT in NetworkTime, it needs to be per-connection
// because the server receives different batch timestamps from
// different connections.
public double remoteTimeStamp { get; internal set; }
internal NetworkConnection(bool batching)
internal NetworkConnection()
{
this.batching = batching;
// set lastTime to current time when creating connection to make
// sure it isn't instantly kicked for inactivity
lastMessageTime = Time.time;
}
internal NetworkConnection(int networkConnectionId, bool batching) : this(batching)
internal NetworkConnection(int networkConnectionId) : this()
{
connectionId = networkConnectionId;
// TODO why isn't lastMessageTime set in here like in the other ctor?
@ -139,27 +143,23 @@ internal virtual void Send(ArraySegment<byte> segment, int channelId = Channels.
{
//Debug.Log("ConnectionSend " + this + " bytes:" + BitConverter.ToString(segment.Array, segment.Offset, segment.Count));
// validate packet size first.
if (ValidatePacketSize(segment, channelId))
{
// batching enabled?
if (batching)
{
// add to batch no matter what.
// batching will try to fit as many as possible into MTU.
// but we still allow > MTU, e.g. kcp max packet size 144kb.
// those are simply sent as single batches.
//
// IMPORTANT: do NOT send > batch sized messages directly:
// - data race: large messages would be sent directly. small
// messages would be sent in the batch at the end of frame
// - timestamps: if batching assumes a timestamp, then large
// messages need that too.
GetBatchForChannelId(channelId).AddMessage(segment);
}
// otherwise send directly to minimize latency
else SendToTransport(segment, channelId);
}
// add to batch no matter what.
// batching will try to fit as many as possible into MTU.
// but we still allow > MTU, e.g. kcp max packet size 144kb.
// those are simply sent as single batches.
//
// IMPORTANT: do NOT send > batch sized messages directly:
// - data race: large messages would be sent directly. small
// messages would be sent in the batch at the end of frame
// - timestamps: if batching assumes a timestamp, then large
// messages need that too.
//
// NOTE: we ALWAYS batch. it's not optional, because the
// receiver needs timestamps for NT etc.
//
// NOTE: we do NOT ValidatePacketSize here yet. the final packet
// will be the full batch, including timestamp.
GetBatchForChannelId(channelId).AddMessage(segment);
}
// Send stage three: hand off to transport
@ -168,34 +168,31 @@ internal virtual void Send(ArraySegment<byte> segment, int channelId = Channels.
// flush batched messages at the end of every Update.
internal virtual void Update()
{
// batching?
if (batching)
// go through batches for all channels
foreach (KeyValuePair<int, Batcher> kvp in batches)
{
// go through batches for all channels
foreach (KeyValuePair<int, Batcher> kvp in batches)
// make and send as many batches as necessary from the stored
// messages.
Batcher batcher = kvp.Value;
using (PooledNetworkWriter writer = NetworkWriterPool.GetWriter())
{
// make and send as many batches as necessary from the stored
// messages.
Batcher batcher = kvp.Value;
using (PooledNetworkWriter writer = NetworkWriterPool.GetWriter())
// make a batch with our local time (double precision)
while (batcher.MakeNextBatch(writer, NetworkTime.localTime))
{
while (batcher.MakeNextBatch(writer))
// validate packet before handing the batch to the
// transport. this guarantees that we always stay
// within transport's max message size limit.
// => just in case transport forgets to check it
// => just in case mirror miscalulated it etc.
ArraySegment<byte> segment = writer.ToArraySegment();
if (ValidatePacketSize(segment, kvp.Key))
{
// validate packet before handing the batch to the
// transport. this guarantees that we always stay
// within transport's max message size limit.
// => just in case transport forgets to check it
// => just in case mirror miscalulated it etc.
ArraySegment<byte> segment = writer.ToArraySegment();
if (ValidatePacketSize(segment, kvp.Key))
{
// send to transport
SendToTransport(segment, kvp.Key);
//UnityEngine.Debug.Log($"sending batch of {writer.Position} bytes for channel={kvp.Key} connId={connectionId}");
// send to transport
SendToTransport(segment, kvp.Key);
//UnityEngine.Debug.Log($"sending batch of {writer.Position} bytes for channel={kvp.Key} connId={connectionId}");
// reset writer for each new batch
writer.Position = 0;
}
// reset writer for each new batch
writer.Position = 0;
}
}
}

View File

@ -10,8 +10,8 @@ public class NetworkConnectionToClient : NetworkConnection
// unbatcher
public Unbatcher unbatcher = new Unbatcher();
public NetworkConnectionToClient(int networkConnectionId, bool batching)
: base(networkConnectionId, batching) {}
public NetworkConnectionToClient(int networkConnectionId)
: base(networkConnectionId) {}
// Send stage three: hand off to transport
protected override void SendToTransport(ArraySegment<byte> segment, int channelId = Channels.Reliable) =>

View File

@ -6,8 +6,6 @@ public class NetworkConnectionToServer : NetworkConnection
{
public override string address => "";
public NetworkConnectionToServer(bool batching) : base(batching) {}
// Send stage three: hand off to transport
protected override void SendToTransport(ArraySegment<byte> segment, int channelId = Channels.Reliable) =>
Transport.activeTransport.ClientSend(segment, channelId);

View File

@ -385,7 +385,7 @@ static void OnTransportConnected(int connectionId)
if (connections.Count < maxConnections)
{
// add connection
NetworkConnectionToClient conn = new NetworkConnectionToClient(connectionId, true);
NetworkConnectionToClient conn = new NetworkConnectionToClient(connectionId);
OnConnected(conn);
}
else
@ -439,7 +439,12 @@ internal static void OnTransportData(int connectionId, ArraySegment<byte> data,
// feed it to the Unbatcher.
// NOTE: we don't need to associate a channelId because we
// always process all messages in the batch.
connection.unbatcher.AddBatch(data);
if (!connection.unbatcher.AddBatch(data))
{
Debug.LogWarning($"NetworkServer: received Message was too short (messages should start with message id)");
connection.Disconnect();
return;
}
// process all messages in the batch.
// only while NOT loading a scene.
@ -452,11 +457,15 @@ internal static void OnTransportData(int connectionId, ArraySegment<byte> data,
// the next time.
// => consider moving processing to NetworkEarlyUpdate.
while (!isLoadingScene &&
connection.unbatcher.GetNextMessage(out NetworkReader reader))
connection.unbatcher.GetNextMessage(out NetworkReader reader, out double remoteTimestamp))
{
// enough to read at least header size?
if (reader.Remaining >= MessagePacking.HeaderSize)
{
// make remoteTimeStamp available to the user
connection.remoteTimeStamp = remoteTimestamp;
// handle message
if (!UnpackAndInvoke(connection, reader, channelId))
break;
}

View File

@ -45,6 +45,8 @@ static NetworkTime()
// after 60 days, accuracy is 454 ms
// in other words, if the server is running for 2 months,
// and you cast down to float, then the time will jump in 0.4s intervals.
//
// TODO consider using Unbatcher's remoteTime for NetworkTime
public static double time => localTime - _offset.Value;
/// <summary>Time measurement variance. The higher, the less accurate the time is.</summary>

View File

@ -4,7 +4,7 @@ namespace Mirror.Tests
{
public class FakeNetworkConnection : NetworkConnectionToClient
{
public FakeNetworkConnection() : base(1, false) {}
public FakeNetworkConnection() : base(1) {}
public override string address => "Test";
public override void Disconnect() {}
internal override void Send(ArraySegment<byte> segment, int channelId = 0) {}

View File

@ -7,9 +7,12 @@ namespace Mirror.Tests.Batching
public class BatcherTests
{
Batcher batcher;
const int Threshold = 4;
const int Threshold = 8 + 4; // 8 bytes timestamp + 4 bytes message
NetworkWriter writer;
// timestamp and serialized timestamp for convenience
const double TimeStamp = Math.PI;
[SetUp]
public void SetUp()
{
@ -17,6 +20,15 @@ public void SetUp()
writer = new NetworkWriter();
}
// helper function to create a batch prefixed by timestamp
public static byte[] ConcatTimestamp(double tickTimeStamp, byte[] data)
{
NetworkWriter writer = new NetworkWriter();
writer.WriteDouble(tickTimeStamp);
writer.WriteBytes(data, 0, data.Length);
return writer.ToArray();
}
[Test]
public void AddMessage()
{
@ -31,7 +43,7 @@ public void MakeNextBatch_OnlyAcceptsFreshWriter()
writer.WriteByte(0);
Assert.Throws<ArgumentException>(() => {
batcher.MakeNextBatch(writer);
batcher.MakeNextBatch(writer, TimeStamp);
});
}
@ -39,7 +51,7 @@ public void MakeNextBatch_OnlyAcceptsFreshWriter()
public void MakeNextBatch_NoMessage()
{
// make batch with no message
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(false));
}
@ -51,9 +63,11 @@ public void MakeNextBatch_OneMessage()
batcher.AddMessage(new ArraySegment<byte>(message));
// make batch
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(message));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, message)));
}
[Test]
@ -63,12 +77,14 @@ public void MakeNextBatch_MultipleMessages_AlmostFullBatch()
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}));
// make batch
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02, 0x03}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02, 0x03})));
// there should be no more batches to make
Assert.That(batcher.MakeNextBatch(writer), Is.False);
Assert.That(batcher.MakeNextBatch(writer, TimeStamp), Is.False);
}
[Test]
@ -78,12 +94,14 @@ public void MakeNextBatch_MultipleMessages_ExactlyFullBatch()
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}));
// make batch
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02, 0x03, 0x04}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02, 0x03, 0x04})));
// there should be no more batches to make
Assert.That(batcher.MakeNextBatch(writer), Is.False);
Assert.That(batcher.MakeNextBatch(writer, TimeStamp), Is.False);
}
[Test]
@ -94,17 +112,21 @@ public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x05}));
// first batch
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02, 0x03, 0x04}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02, 0x03, 0x04})));
// reset writer
writer.Position = 0;
// second batch
result = batcher.MakeNextBatch(writer);
result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x05}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x05})));
}
[Test]
@ -116,25 +138,31 @@ public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x06, 0x07}));
// first batch
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01})));
// reset writer
writer.Position = 0;
// second batch
result = batcher.MakeNextBatch(writer);
result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x02, 0x03, 0x04, 0x05}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x02, 0x03, 0x04, 0x05})));
// reset writer
writer.Position = 0;
// third batch
result = batcher.MakeNextBatch(writer);
result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x06, 0x07}));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x06, 0x07})));
}
// messages > threshold should simply be single batches.
@ -151,9 +179,9 @@ public void MakeNextBatch_LargerThanThreshold()
batcher.AddMessage(new ArraySegment<byte>(large));
// result should be only the large message
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(large));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, large)));
}
// messages > threshold should simply be single batches.
@ -178,25 +206,25 @@ public void MakeNextBatch_LargerThanThreshold_BetweenSmallerMessages()
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x04}));
// first batch should be the two small messages
bool result = batcher.MakeNextBatch(writer);
bool result = batcher.MakeNextBatch(writer, TimeStamp);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02}));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02})));
// reset writer
writer.Position = 0;
// second batch should be only the large message
result = batcher.MakeNextBatch(writer);
result = batcher.MakeNextBatch(writer, TimeStamp + 1);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(large));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp + 1, large)));
// reset writer
writer.Position = 0;
// third batch be the two small messages
result = batcher.MakeNextBatch(writer);
result = batcher.MakeNextBatch(writer, TimeStamp + 2);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x03, 0x04}));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp + 2, new byte[]{0x03, 0x04})));
}
}
}

View File

@ -6,6 +6,7 @@ namespace Mirror.Tests.Batching
public class UnbatcherTests
{
Unbatcher unbatcher;
const double TimeStamp = Math.PI;
[SetUp]
public void SetUp()
@ -16,7 +17,7 @@ public void SetUp()
[Test]
public void GetNextMessage_NoBatches()
{
bool result = unbatcher.GetNextMessage(out NetworkReader _);
bool result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
@ -24,21 +25,23 @@ public void GetNextMessage_NoBatches()
public void GetNextMessage_OneBatch()
{
// add one batch
byte[] batch = {0x01, 0x02};
byte[] batch = BatcherTests.ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(batch));
// get next message, read first byte
bool result = unbatcher.GetNextMessage(out NetworkReader reader);
bool result = unbatcher.GetNextMessage(out NetworkReader reader, out double remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// get next message, read last byte
result = unbatcher.GetNextMessage(out reader);
result = unbatcher.GetNextMessage(out reader, out remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// there should be no more messages
result = unbatcher.GetNextMessage(out _);
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
@ -46,27 +49,29 @@ public void GetNextMessage_OneBatch()
public void GetNextMessage_MultipleBatches()
{
// add first batch
byte[] firstBatch = {0x01, 0x02};
byte[] firstBatch = BatcherTests.ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(firstBatch));
// add second batch
byte[] secondBatch = {0x03, 0x04};
byte[] secondBatch = BatcherTests.ConcatTimestamp(TimeStamp + 1, new byte[]{0x03, 0x04});
unbatcher.AddBatch(new ArraySegment<byte>(secondBatch));
// get next message, read everything
bool result = unbatcher.GetNextMessage(out NetworkReader reader);
bool result = unbatcher.GetNextMessage(out NetworkReader reader, out double remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// get next message, should point to next batch
result = unbatcher.GetNextMessage(out reader);
// get next message, should point to next batch at Timestamp + 1
result = unbatcher.GetNextMessage(out reader, out remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x03));
Assert.That(reader.ReadByte(), Is.EqualTo(0x04));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp + 1));
// there should be no more messages
result = unbatcher.GetNextMessage(out _);
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
@ -80,29 +85,31 @@ public void GetNextMessage_MultipleBatches()
public void RetireBatchAndTryNewBatch()
{
// add first batch
byte[] firstBatch = {0x01, 0x02};
byte[] firstBatch = BatcherTests.ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(firstBatch));
// read everything
bool result = unbatcher.GetNextMessage(out NetworkReader reader);
bool result = unbatcher.GetNextMessage(out NetworkReader reader, out double remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// try to read again.
// reader will be at limit, which should retire the batch.
result = unbatcher.GetNextMessage(out reader);
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
// add new batch
byte[] secondBatch = {0x03, 0x04};
byte[] secondBatch = BatcherTests.ConcatTimestamp(TimeStamp + 1, new byte[]{0x03, 0x04});
unbatcher.AddBatch(new ArraySegment<byte>(secondBatch));
// read everything
result = unbatcher.GetNextMessage(out reader);
result = unbatcher.GetNextMessage(out reader, out remoteTimeStamp);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x03));
Assert.That(reader.ReadByte(), Is.EqualTo(0x04));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp + 1));
}
}
}

View File

@ -21,7 +21,7 @@ public override void SetUp()
// A with connectionId = 0x0A, netId = 0xAA
CreateNetworked(out gameObjectA, out identityA);
connectionA = new NetworkConnectionToClient(0x0A, false);
connectionA = new NetworkConnectionToClient(0x0A);
connectionA.isAuthenticated = true;
connectionA.isReady = true;
connectionA.identity = identityA;
@ -29,7 +29,7 @@ public override void SetUp()
// B
CreateNetworked(out gameObjectB, out identityB);
connectionB = new NetworkConnectionToClient(0x0B, false);
connectionB = new NetworkConnectionToClient(0x0B);
connectionB.isAuthenticated = true;
connectionB.isReady = true;
connectionB.identity = identityB;

View File

@ -445,7 +445,7 @@ public void SendTargetRPCInternal()
// calling rpc on connectionToServer shouldn't work
LogAssert.Expect(LogType.Error, $"TargetRPC {nameof(NetworkBehaviourSendTargetRPCInternalComponent.TargetRPCGenerated)} requires a NetworkConnectionToClient but was given {typeof(NetworkConnectionToServer).Name}");
comp.CallSendTargetRPCInternal(new NetworkConnectionToServer(false));
comp.CallSendTargetRPCInternal(new NetworkConnectionToServer());
Assert.That(comp.called, Is.EqualTo(0));
// set proper connection to client

View File

@ -30,24 +30,11 @@ public override void TearDown()
base.TearDown();
}
[Test]
public void Send_WithoutBatching_SendsImmediately()
{
// create connection and send
NetworkConnectionToClient connection = new NetworkConnectionToClient(42, false);
byte[] message = {0x01, 0x02};
connection.Send(new ArraySegment<byte>(message));
// Send() should send immediately, not only in server.update flushing
UpdateTransport();
Assert.That(clientReceived.Count, Is.EqualTo(1));
}
[Test]
public void Send_BatchesUntilUpdate()
{
// create connection and send
NetworkConnectionToClient connection = new NetworkConnectionToClient(42, true);
NetworkConnectionToClient connection = new NetworkConnectionToClient(42);
byte[] message = {0x01, 0x02};
connection.Send(new ArraySegment<byte>(message));
@ -71,8 +58,11 @@ public void Send_BatchesUntilUpdate()
[Test]
public void SendBatchingResetsPreviousWriter()
{
// batching adds 8 byte timestamp header
const int BatchHeader = 8;
// create connection
NetworkConnectionToClient connection = new NetworkConnectionToClient(42, true);
NetworkConnectionToClient connection = new NetworkConnectionToClient(42);
// send and update big message
byte[] message = {0x01, 0x02};
@ -80,9 +70,9 @@ public void SendBatchingResetsPreviousWriter()
connection.Update();
UpdateTransport();
Assert.That(clientReceived.Count, Is.EqualTo(1));
Assert.That(clientReceived[0].Length, Is.EqualTo(2));
Assert.That(clientReceived[0][0], Is.EqualTo(0x01));
Assert.That(clientReceived[0][1], Is.EqualTo(0x02));
Assert.That(clientReceived[0].Length, Is.EqualTo(BatchHeader + 2));
Assert.That(clientReceived[0][BatchHeader + 0], Is.EqualTo(0x01));
Assert.That(clientReceived[0][BatchHeader + 1], Is.EqualTo(0x02));
// clear previous
clientReceived.Clear();
@ -93,8 +83,8 @@ public void SendBatchingResetsPreviousWriter()
connection.Update();
UpdateTransport();
Assert.That(clientReceived.Count, Is.EqualTo(1));
Assert.That(clientReceived[0].Length, Is.EqualTo(1));
Assert.That(clientReceived[0][0], Is.EqualTo(0xFF));
Assert.That(clientReceived[0].Length, Is.EqualTo(BatchHeader + 1));
Assert.That(clientReceived[0][BatchHeader + 0], Is.EqualTo(0xFF));
}
}
}

View File

@ -414,11 +414,11 @@ public void RemoveObserverInternal()
identity.OnStartServer();
// add an observer connection
NetworkConnectionToClient connection = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient connection = new NetworkConnectionToClient(42);
identity.observers[connection.connectionId] = connection;
// RemoveObserverInternal with invalid connection should do nothing
identity.RemoveObserverInternal(new NetworkConnectionToClient(43, false));
identity.RemoveObserverInternal(new NetworkConnectionToClient(43));
Assert.That(identity.observers.Count, Is.EqualTo(1));
// RemoveObserverInternal with existing connection should remove it
@ -622,7 +622,7 @@ public void AssignAndRemoveClientAuthority()
// another connection
// error log is expected
LogAssert.ignoreFailingMessages = true;
result = identity.AssignClientAuthority(new NetworkConnectionToClient(43, false));
result = identity.AssignClientAuthority(new NetworkConnectionToClient(43));
LogAssert.ignoreFailingMessages = false;
Assert.That(result, Is.False);
Assert.That(identity.connectionToClient, Is.EqualTo(owner));
@ -987,8 +987,8 @@ public void AddObserver()
CreateNetworked(out GameObject _, out NetworkIdentity identity);
// create some connections
NetworkConnectionToClient connection1 = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient connection2 = new NetworkConnectionToClient(43, false);
NetworkConnectionToClient connection1 = new NetworkConnectionToClient(42);
NetworkConnectionToClient connection2 = new NetworkConnectionToClient(43);
// AddObserver should return early if called before .observers was
// created
@ -1012,7 +1012,7 @@ public void AddObserver()
Assert.That(identity.observers[connection2.connectionId], Is.EqualTo(connection2));
// adding a duplicate connectionId shouldn't overwrite the original
NetworkConnectionToClient duplicate = new NetworkConnectionToClient(connection1.connectionId, false);
NetworkConnectionToClient duplicate = new NetworkConnectionToClient(connection1.connectionId);
identity.AddObserver(duplicate);
Assert.That(identity.observers.Count, Is.EqualTo(2));
Assert.That(identity.observers.ContainsKey(connection1.connectionId));
@ -1030,8 +1030,8 @@ public void ClearObservers()
identity.OnStartServer();
// add some observers
identity.observers[42] = new NetworkConnectionToClient(42, false);
identity.observers[43] = new NetworkConnectionToClient(43, false);
identity.observers[42] = new NetworkConnectionToClient(42);
identity.observers[43] = new NetworkConnectionToClient(43);
// call ClearObservers
identity.ClearObservers();
@ -1111,9 +1111,9 @@ public void Reset()
identity.isClient = true;
// creates .observers and generates a netId
identity.OnStartServer();
identity.connectionToClient = new NetworkConnectionToClient(1, false);
identity.connectionToServer = new NetworkConnectionToServer(false);
identity.observers[43] = new NetworkConnectionToClient(2, false);
identity.connectionToClient = new NetworkConnectionToClient(1);
identity.connectionToServer = new NetworkConnectionToServer();
identity.observers[43] = new NetworkConnectionToClient(2);
// mark for reset and reset
identity.Reset();

View File

@ -43,7 +43,7 @@ public override void SetUp()
static NetworkConnection CreateNetworkConnection(GameObject player)
{
NetworkConnectionToClient connection = new NetworkConnectionToClient(++nextConnectionId, false);
NetworkConnectionToClient connection = new NetworkConnectionToClient(++nextConnectionId);
connection.identity = player.GetComponent<NetworkIdentity>();
connection.identity.connectionToClient = connection;
connection.identity.observers = new Dictionary<int, NetworkConnection>();

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Threading;
using NUnit.Framework;
using UnityEngine;
using UnityEngine.TestTools;
@ -240,13 +241,13 @@ public void AddConnection()
NetworkServer.Listen(1);
// add first connection
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42);
Assert.That(NetworkServer.AddConnection(conn42), Is.True);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(1));
Assert.That(NetworkServer.connections[42], Is.EqualTo(conn42));
// add second connection
NetworkConnectionToClient conn43 = new NetworkConnectionToClient(43, false);
NetworkConnectionToClient conn43 = new NetworkConnectionToClient(43);
Assert.That(NetworkServer.AddConnection(conn43), Is.True);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(2));
Assert.That(NetworkServer.connections[42], Is.EqualTo(conn42));
@ -260,13 +261,13 @@ public void AddConnection_PreventsDuplicates()
NetworkServer.Listen(1);
// add a connection
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42);
Assert.That(NetworkServer.AddConnection(conn42), Is.True);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(1));
Assert.That(NetworkServer.connections[42], Is.EqualTo(conn42));
// add duplicate connectionId
NetworkConnectionToClient connDup = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient connDup = new NetworkConnectionToClient(42);
Assert.That(NetworkServer.AddConnection(connDup), Is.False);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(1));
Assert.That(NetworkServer.connections[42], Is.EqualTo(conn42));
@ -279,7 +280,7 @@ public void RemoveConnection()
NetworkServer.Listen(1);
// add connection
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42);
Assert.That(NetworkServer.AddConnection(conn42), Is.True);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(1));
@ -295,7 +296,7 @@ public void DisconnectAllTest_RemoteConnection()
NetworkServer.Listen(1);
// add connection
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42, false);
NetworkConnectionToClient conn42 = new NetworkConnectionToClient(42);
NetworkServer.AddConnection(conn42);
Assert.That(NetworkServer.connections.Count, Is.EqualTo(1));
@ -568,6 +569,74 @@ public void Send_ServerToClientMessage_LargerThanBatchThreshold_SentInOrder()
Assert.That(received[1], Is.EqualTo("big"));
}
// make sure NetworkConnection.remoteTimeStamp is always the time on the
// remote end when the message was sent
[Test]
public void Send_ClientToServerMessage_SetsRemoteTimeStamp()
{
// register a message handler
int called = 0;
NetworkServer.RegisterHandler<TestMessage1>((conn, msg) => ++called, false);
// listen & connect a client
NetworkServer.Listen(1);
ConnectClientBlocking(out NetworkConnectionToClient connectionToClient);
// send message
NetworkClient.Send(new TestMessage1());
// remember current time & update NetworkClient IMMEDIATELY so the
// batch is finished with timestamp.
double sendTime = NetworkTime.localTime;
NetworkClient.NetworkLateUpdate();
// let some time pass before processing
const int waitTime = 100;
Thread.Sleep(waitTime);
ProcessMessages();
// is the remote timestamp set to when we sent it?
// remember the time when we sent the message
// (within 1/10th of the time we waited. we need some tolerance
// because we don't capture NetworkTime.localTime exactly when we
// finish the batch. but the difference should not be > 'waitTime')
Assert.That(called, Is.EqualTo(1));
Assert.That(connectionToClient.remoteTimeStamp, Is.EqualTo(sendTime).Within(waitTime / 10));
}
[Test]
public void Send_ServerToClientMessage_SetsRemoteTimeStamp()
{
// register a message handler
int called = 0;
NetworkClient.RegisterHandler<TestMessage1>(msg => ++called, false);
// listen & connect a client
NetworkServer.Listen(1);
ConnectClientBlocking(out NetworkConnectionToClient connectionToClient);
// send message
connectionToClient.Send(new TestMessage1());
// remember current time & update NetworkClient IMMEDIATELY so the
// batch is finished with timestamp.
double sendTime = NetworkTime.localTime;
NetworkServer.NetworkLateUpdate();
// let some time pass before processing
const int waitTime = 100;
Thread.Sleep(waitTime);
ProcessMessages();
// is the remote timestamp set to when we sent it?
// remember the time when we sent the message
// (within 1/10th of the time we waited. we need some tolerance
// because we don't capture NetworkTime.localTime exactly when we
// finish the batch. but the difference should not be > 'waitTime')
Assert.That(called, Is.EqualTo(1));
Assert.That(NetworkClient.connection.remoteTimeStamp, Is.EqualTo(sendTime).Within(waitTime / 10));
}
[Test]
public void OnDataReceivedInvalidConnectionId()
{

View File

@ -112,7 +112,6 @@ public void ErrorForTargetRpcWhenNotGivenConnectionToClient()
class FakeConnection : NetworkConnection
{
public override string address => throw new NotImplementedException();
public FakeConnection() : base(false) {}
public override void Disconnect() => throw new NotImplementedException();
internal override void Send(ArraySegment<byte> segment, int channelId = 0) => throw new NotImplementedException();
protected override void SendToTransport(ArraySegment<byte> segment, int channelId = Channels.Reliable) => throw new NotImplementedException();

View File

@ -29,7 +29,7 @@ public override IEnumerator UnityTearDown()
public IEnumerator DestroyPlayerForConnectionTest()
{
GameObject player = new GameObject("testPlayer", typeof(NetworkIdentity));
NetworkConnectionToClient conn = new NetworkConnectionToClient(1, false);
NetworkConnectionToClient conn = new NetworkConnectionToClient(1);
NetworkServer.AddPlayerForConnection(conn, player);
@ -48,7 +48,7 @@ public IEnumerator DestroyPlayerForConnectionTest()
public IEnumerator RemovePlayerForConnectionTest()
{
GameObject player = new GameObject("testPlayer", typeof(NetworkIdentity));
NetworkConnectionToClient conn = new NetworkConnectionToClient(1, false);
NetworkConnectionToClient conn = new NetworkConnectionToClient(1);
NetworkServer.AddPlayerForConnection(conn, player);