diff --git a/Assets/Mirror/Runtime/Batching/Batcher.cs b/Assets/Mirror/Runtime/Batching/Batcher.cs index 4d6138a8e..3a713f7bf 100644 --- a/Assets/Mirror/Runtime/Batching/Batcher.cs +++ b/Assets/Mirror/Runtime/Batching/Batcher.cs @@ -1,6 +1,9 @@ // batching functionality encapsulated into one class. // -> less complexity // -> easy to test +// +// IMPORTANT: we use THRESHOLD batching, not MAXED SIZE batching. +// see threshold comments below. using System; using System.Collections.Generic; @@ -8,30 +11,35 @@ namespace Mirror { public class Batcher { - // max batch size - readonly int MaxBatchSize; + // batching threshold instead of max size. + // -> small messages are fit into threshold sized batches + // -> messages larger than threshold are single batches + // + // in other words, we fit up to 'threshold' but still allow larger ones + // for two reasons: + // 1.) data races: skipping batching for larger messages would send a + // large spawn message immediately, while others are batched and + // only flushed at the end of the frame + // 2) timestamp batching: if each batch is expected to contain a + // timestamp, then large messages have to be a batch too. otherwise + // they would not contain a timestamp + readonly int threshold; // batched messages // IMPORTANT: we queue the serialized messages! // queueing NetworkMessage would box and allocate! Queue messages = new Queue(); - public Batcher(int MaxBatchSize) + public Batcher(int threshold) { - this.MaxBatchSize = MaxBatchSize; + this.threshold = threshold; } // add a message for batching - // -> true if it worked. - // -> false if too big for max. - // => true/false instead of exception because the user might try to send - // a gigantic message once. which is fine. but we won't batch it. - public bool AddMessage(ArraySegment message) + // we allow any sized messages. + // caller needs to make sure they are within max packet size. + public void AddMessage(ArraySegment message) { - // make sure the message can fit into max batch size - if (message.Count > MaxBatchSize) - return false; - // put into a (pooled) writer // -> WriteBytes instead of WriteSegment because the latter // would add a size header. we want to write directly. @@ -40,7 +48,6 @@ public bool AddMessage(ArraySegment message) PooledNetworkWriter writer = NetworkWriterPool.GetWriter(); writer.WriteBytes(message.Array, message.Offset, message.Count); messages.Enqueue(writer); - return true; } // batch as many messages as possible into writer @@ -55,27 +62,22 @@ public bool MakeNextBatch(NetworkWriter writer) if (writer.Position != 0) throw new ArgumentException($"MakeNextBatch needs a fresh writer!"); - // for each queued message - while (messages.Count > 0) + // do start no matter what + do { - // peek and see if it still fits - PooledNetworkWriter message = messages.Peek(); + // add next message no matter what. even if > threshold. + // (we do allow > threshold sized messages as single batch) + PooledNetworkWriter message = messages.Dequeue(); ArraySegment segment = message.ToArraySegment(); + writer.WriteBytes(segment.Array, segment.Offset, segment.Count); - // still fits? - if (writer.Position + segment.Count <= MaxBatchSize) - { - // add it - // (without any size prefixes. we can fit exactly segment.count!) - writer.WriteBytes(segment.Array, segment.Offset, segment.Count); - - // eat it & return to pool - messages.Dequeue(); - NetworkWriterPool.Recycle(message); - } - // doesn't fit. this batch is done - else break; + // return the writer to pool + NetworkWriterPool.Recycle(message); } + // keep going as long as we have more messages, + // AND the next one would fit into threshold. + while (messages.Count > 0 && + writer.Position + messages.Peek().Position <= threshold); // we had messages, so a batch was made return true; diff --git a/Assets/Mirror/Runtime/NetworkConnectionToClient.cs b/Assets/Mirror/Runtime/NetworkConnectionToClient.cs index 8958d1122..cdfe1c06f 100644 --- a/Assets/Mirror/Runtime/NetworkConnectionToClient.cs +++ b/Assets/Mirror/Runtime/NetworkConnectionToClient.cs @@ -26,13 +26,17 @@ internal override void Send(ArraySegment segment, int channelId = Channels // batching enabled? if (batching) { - // try to batch, or send directly if too big. - // (user might try to send a max message sized message, - // where max message size is larger than max batch size. - // for example, kcp2k max message size is 144 KB but we - // only want to batch MTU each time) - if (!GetBatchForChannelId(channelId).AddMessage(segment)) - Transport.activeTransport.ServerSend(connectionId, segment, channelId); + // add to batch no matter what. + // batching will try to fit as many as possible into MTU. + // but we still allow > MTU, e.g. kcp max packet size 144kb. + // those are simply sent as single batches. + // + // IMPORTANT: do NOT send > batch sized messages directly: + // - data race: large messages would be sent directly. small + // messages would be sent in the batch at the end of frame + // - timestamps: if batching assumes a timestamp, then large + // messages need that too. + GetBatchForChannelId(channelId).AddMessage(segment); } // otherwise send directly to minimize latency else Transport.activeTransport.ServerSend(connectionId, segment, channelId); diff --git a/Assets/Mirror/Runtime/NetworkConnectionToServer.cs b/Assets/Mirror/Runtime/NetworkConnectionToServer.cs index c17579b0a..3d8427f0d 100644 --- a/Assets/Mirror/Runtime/NetworkConnectionToServer.cs +++ b/Assets/Mirror/Runtime/NetworkConnectionToServer.cs @@ -21,13 +21,17 @@ internal override void Send(ArraySegment segment, int channelId = Channels // batching enabled? if (batching) { - // try to batch, or send directly if too big. - // (user might try to send a max message sized message, - // where max message size is larger than max batch size. - // for example, kcp2k max message size is 144 KB but we - // only want to batch MTU each time) - if (!GetBatchForChannelId(channelId).AddMessage(segment)) - Transport.activeTransport.ClientSend(segment, channelId); + // add to batch no matter what. + // batching will try to fit as many as possible into MTU. + // but we still allow > MTU, e.g. kcp max packet size 144kb. + // those are simply sent as single batches. + // + // IMPORTANT: do NOT send > batch sized messages directly: + // - data race: large messages would be sent directly. small + // messages would be sent in the batch at the end of frame + // - timestamps: if batching assumes a timestamp, then large + // messages need that too. + GetBatchForChannelId(channelId).AddMessage(segment); } // otherwise send directly to minimize latency else Transport.activeTransport.ClientSend(segment, channelId); diff --git a/Assets/Mirror/Tests/Editor/Batching/BatcherTests.cs b/Assets/Mirror/Tests/Editor/Batching/BatcherTests.cs index 9bcfed633..638082cd9 100644 --- a/Assets/Mirror/Tests/Editor/Batching/BatcherTests.cs +++ b/Assets/Mirror/Tests/Editor/Batching/BatcherTests.cs @@ -7,30 +7,21 @@ namespace Mirror.Tests.Batching public class BatcherTests { Batcher batcher; - const int MaxBatchSize = 4; + const int Threshold = 4; NetworkWriter writer; [SetUp] public void SetUp() { - batcher = new Batcher(MaxBatchSize); + batcher = new Batcher(Threshold); writer = new NetworkWriter(); } [Test] - public void AddMessage_AddsToQueue() + public void AddMessage() { byte[] message = {0x01, 0x02}; - bool result = batcher.AddMessage(new ArraySegment(message)); - Assert.That(result, Is.True); - } - - [Test] - public void AddMessage_DetectsTooBig() - { - byte[] message = new byte[MaxBatchSize + 1]; - bool result = batcher.AddMessage(new ArraySegment(message)); - Assert.That(result, Is.False); + batcher.AddMessage(new ArraySegment(message)); } [Test] @@ -66,7 +57,7 @@ public void MakeNextBatch_OneMessage() } [Test] - public void MakeNextBatch_MultipleMessages_AlmostMaxBatchSize() + public void MakeNextBatch_MultipleMessages_AlmostFullBatch() { batcher.AddMessage(new ArraySegment(new byte[]{0x01, 0x02})); batcher.AddMessage(new ArraySegment(new byte[]{0x03})); @@ -81,7 +72,7 @@ public void MakeNextBatch_MultipleMessages_AlmostMaxBatchSize() } [Test] - public void MakeNextBatch_MultipleMessages_ExactlyMaxBatchSize() + public void MakeNextBatch_MultipleMessages_ExactlyFullBatch() { batcher.AddMessage(new ArraySegment(new byte[]{0x01, 0x02})); batcher.AddMessage(new ArraySegment(new byte[]{0x03, 0x04})); @@ -96,7 +87,7 @@ public void MakeNextBatch_MultipleMessages_ExactlyMaxBatchSize() } [Test] - public void MakeNextBatch_MultipleMessages_LargerMaxBatchSize() + public void MakeNextBatch_MultipleMessages_MoreThanOneBatch() { batcher.AddMessage(new ArraySegment(new byte[]{0x01, 0x02})); batcher.AddMessage(new ArraySegment(new byte[]{0x03, 0x04})); @@ -145,5 +136,67 @@ public void MakeNextBatch_MultipleMessages_Small_Giant_Small() Assert.That(result, Is.EqualTo(true)); Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x06, 0x07})); } + + // messages > threshold should simply be single batches. + // those need to be supported too, for example: + // kcp prefers MTU sized batches + // but we still allow up to 144 KB max message size + [Test] + public void MakeNextBatch_LargerThanThreshold() + { + // make a larger than threshold message + byte[] large = new byte[Threshold + 1]; + for (int i = 0; i < Threshold + 1; ++i) + large[i] = (byte)i; + batcher.AddMessage(new ArraySegment(large)); + + // result should be only the large message + bool result = batcher.MakeNextBatch(writer); + Assert.That(result, Is.EqualTo(true)); + Assert.That(writer.ToArray().SequenceEqual(large)); + } + + // messages > threshold should simply be single batches. + // those need to be supported too, for example: + // kcp prefers MTU sized batches + // but we still allow up to 144 KB max message size + [Test] + public void MakeNextBatch_LargerThanThreshold_BetweenSmallerMessages() + { + // make a larger than threshold message + byte[] large = new byte[Threshold + 1]; + for (int i = 0; i < Threshold + 1; ++i) + large[i] = (byte)i; + + // add two small, one large, two small messages. + // to make sure everything around it is still batched, + // and the large one is a separate batch. + batcher.AddMessage(new ArraySegment(new byte[]{0x01})); + batcher.AddMessage(new ArraySegment(new byte[]{0x02})); + batcher.AddMessage(new ArraySegment(large)); + batcher.AddMessage(new ArraySegment(new byte[]{0x03})); + batcher.AddMessage(new ArraySegment(new byte[]{0x04})); + + // first batch should be the two small messages + bool result = batcher.MakeNextBatch(writer); + Assert.That(result, Is.EqualTo(true)); + Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02})); + + // reset writer + writer.Position = 0; + + // second batch should be only the large message + result = batcher.MakeNextBatch(writer); + Assert.That(result, Is.EqualTo(true)); + Assert.That(writer.ToArray().SequenceEqual(large)); + + // reset writer + writer.Position = 0; + + // third batch be the two small messages + result = batcher.MakeNextBatch(writer); + Assert.That(result, Is.EqualTo(true)); + Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x03, 0x04})); + } } }