fix: Batcher 'MaxBatchSize' changed to 'Threshold' to support larger messages too. They simply become large batches, while smaller ones are still fitted into 'Threshold'. fixes data race where larger messages would be sent immediately, while smaller ones would be sent at the end of the frame. prepares for timestamp batching which assumes a timestamp prefix for ALL batches (including larger ones, which previously would be treated as not a batch) (#2787)

* fix: Batcher 'MaxBatchSize' changed to 'Threshold' to support larger messages too. They simply become large batches, while smaller ones are still fitted into 'Threshold'. fixes data race where larger messages would be sent immediately, while smaller ones would be sent at the end of the frame. prepares for timestamp batching which assumes a timestamp prefix for ALL batches (including larger ones, which previously would be treated as not a batch)

* add comment

* better comment

* do-while

* better comment
This commit is contained in:
vis2k 2021-06-17 19:08:50 +08:00 committed by GitHub
parent 69d739a5cd
commit 2a563b96df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 61 deletions

View File

@ -1,6 +1,9 @@
// batching functionality encapsulated into one class.
// -> less complexity
// -> easy to test
//
// IMPORTANT: we use THRESHOLD batching, not MAXED SIZE batching.
// see threshold comments below.
using System;
using System.Collections.Generic;
@ -8,30 +11,35 @@ namespace Mirror
{
public class Batcher
{
// max batch size
readonly int MaxBatchSize;
// batching threshold instead of max size.
// -> small messages are fit into threshold sized batches
// -> messages larger than threshold are single batches
//
// in other words, we fit up to 'threshold' but still allow larger ones
// for two reasons:
// 1.) data races: skipping batching for larger messages would send a
// large spawn message immediately, while others are batched and
// only flushed at the end of the frame
// 2) timestamp batching: if each batch is expected to contain a
// timestamp, then large messages have to be a batch too. otherwise
// they would not contain a timestamp
readonly int threshold;
// batched messages
// IMPORTANT: we queue the serialized messages!
// queueing NetworkMessage would box and allocate!
Queue<PooledNetworkWriter> messages = new Queue<PooledNetworkWriter>();
public Batcher(int MaxBatchSize)
public Batcher(int threshold)
{
this.MaxBatchSize = MaxBatchSize;
this.threshold = threshold;
}
// add a message for batching
// -> true if it worked.
// -> false if too big for max.
// => true/false instead of exception because the user might try to send
// a gigantic message once. which is fine. but we won't batch it.
public bool AddMessage(ArraySegment<byte> message)
// we allow any sized messages.
// caller needs to make sure they are within max packet size.
public void AddMessage(ArraySegment<byte> message)
{
// make sure the message can fit into max batch size
if (message.Count > MaxBatchSize)
return false;
// put into a (pooled) writer
// -> WriteBytes instead of WriteSegment because the latter
// would add a size header. we want to write directly.
@ -40,7 +48,6 @@ public bool AddMessage(ArraySegment<byte> message)
PooledNetworkWriter writer = NetworkWriterPool.GetWriter();
writer.WriteBytes(message.Array, message.Offset, message.Count);
messages.Enqueue(writer);
return true;
}
// batch as many messages as possible into writer
@ -55,27 +62,22 @@ public bool MakeNextBatch(NetworkWriter writer)
if (writer.Position != 0)
throw new ArgumentException($"MakeNextBatch needs a fresh writer!");
// for each queued message
while (messages.Count > 0)
// do start no matter what
do
{
// peek and see if it still fits
PooledNetworkWriter message = messages.Peek();
// add next message no matter what. even if > threshold.
// (we do allow > threshold sized messages as single batch)
PooledNetworkWriter message = messages.Dequeue();
ArraySegment<byte> segment = message.ToArraySegment();
// still fits?
if (writer.Position + segment.Count <= MaxBatchSize)
{
// add it
// (without any size prefixes. we can fit exactly segment.count!)
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
// eat it & return to pool
messages.Dequeue();
// return the writer to pool
NetworkWriterPool.Recycle(message);
}
// doesn't fit. this batch is done
else break;
}
// keep going as long as we have more messages,
// AND the next one would fit into threshold.
while (messages.Count > 0 &&
writer.Position + messages.Peek().Position <= threshold);
// we had messages, so a batch was made
return true;

View File

@ -26,13 +26,17 @@ internal override void Send(ArraySegment<byte> segment, int channelId = Channels
// batching enabled?
if (batching)
{
// try to batch, or send directly if too big.
// (user might try to send a max message sized message,
// where max message size is larger than max batch size.
// for example, kcp2k max message size is 144 KB but we
// only want to batch MTU each time)
if (!GetBatchForChannelId(channelId).AddMessage(segment))
Transport.activeTransport.ServerSend(connectionId, segment, channelId);
// add to batch no matter what.
// batching will try to fit as many as possible into MTU.
// but we still allow > MTU, e.g. kcp max packet size 144kb.
// those are simply sent as single batches.
//
// IMPORTANT: do NOT send > batch sized messages directly:
// - data race: large messages would be sent directly. small
// messages would be sent in the batch at the end of frame
// - timestamps: if batching assumes a timestamp, then large
// messages need that too.
GetBatchForChannelId(channelId).AddMessage(segment);
}
// otherwise send directly to minimize latency
else Transport.activeTransport.ServerSend(connectionId, segment, channelId);

View File

@ -21,13 +21,17 @@ internal override void Send(ArraySegment<byte> segment, int channelId = Channels
// batching enabled?
if (batching)
{
// try to batch, or send directly if too big.
// (user might try to send a max message sized message,
// where max message size is larger than max batch size.
// for example, kcp2k max message size is 144 KB but we
// only want to batch MTU each time)
if (!GetBatchForChannelId(channelId).AddMessage(segment))
Transport.activeTransport.ClientSend(segment, channelId);
// add to batch no matter what.
// batching will try to fit as many as possible into MTU.
// but we still allow > MTU, e.g. kcp max packet size 144kb.
// those are simply sent as single batches.
//
// IMPORTANT: do NOT send > batch sized messages directly:
// - data race: large messages would be sent directly. small
// messages would be sent in the batch at the end of frame
// - timestamps: if batching assumes a timestamp, then large
// messages need that too.
GetBatchForChannelId(channelId).AddMessage(segment);
}
// otherwise send directly to minimize latency
else Transport.activeTransport.ClientSend(segment, channelId);

View File

@ -7,30 +7,21 @@ namespace Mirror.Tests.Batching
public class BatcherTests
{
Batcher batcher;
const int MaxBatchSize = 4;
const int Threshold = 4;
NetworkWriter writer;
[SetUp]
public void SetUp()
{
batcher = new Batcher(MaxBatchSize);
batcher = new Batcher(Threshold);
writer = new NetworkWriter();
}
[Test]
public void AddMessage_AddsToQueue()
public void AddMessage()
{
byte[] message = {0x01, 0x02};
bool result = batcher.AddMessage(new ArraySegment<byte>(message));
Assert.That(result, Is.True);
}
[Test]
public void AddMessage_DetectsTooBig()
{
byte[] message = new byte[MaxBatchSize + 1];
bool result = batcher.AddMessage(new ArraySegment<byte>(message));
Assert.That(result, Is.False);
batcher.AddMessage(new ArraySegment<byte>(message));
}
[Test]
@ -66,7 +57,7 @@ public void MakeNextBatch_OneMessage()
}
[Test]
public void MakeNextBatch_MultipleMessages_AlmostMaxBatchSize()
public void MakeNextBatch_MultipleMessages_AlmostFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}));
@ -81,7 +72,7 @@ public void MakeNextBatch_MultipleMessages_AlmostMaxBatchSize()
}
[Test]
public void MakeNextBatch_MultipleMessages_ExactlyMaxBatchSize()
public void MakeNextBatch_MultipleMessages_ExactlyFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}));
@ -96,7 +87,7 @@ public void MakeNextBatch_MultipleMessages_ExactlyMaxBatchSize()
}
[Test]
public void MakeNextBatch_MultipleMessages_LargerMaxBatchSize()
public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}));
@ -145,5 +136,67 @@ public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x06, 0x07}));
}
// messages > threshold should simply be single batches.
// those need to be supported too, for example:
// kcp prefers MTU sized batches
// but we still allow up to 144 KB max message size
[Test]
public void MakeNextBatch_LargerThanThreshold()
{
// make a larger than threshold message
byte[] large = new byte[Threshold + 1];
for (int i = 0; i < Threshold + 1; ++i)
large[i] = (byte)i;
batcher.AddMessage(new ArraySegment<byte>(large));
// result should be only the large message
bool result = batcher.MakeNextBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(large));
}
// messages > threshold should simply be single batches.
// those need to be supported too, for example:
// kcp prefers MTU sized batches
// but we still allow up to 144 KB max message size
[Test]
public void MakeNextBatch_LargerThanThreshold_BetweenSmallerMessages()
{
// make a larger than threshold message
byte[] large = new byte[Threshold + 1];
for (int i = 0; i < Threshold + 1; ++i)
large[i] = (byte)i;
// add two small, one large, two small messages.
// to make sure everything around it is still batched,
// and the large one is a separate batch.
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02}));
batcher.AddMessage(new ArraySegment<byte>(large));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x04}));
// first batch should be the two small messages
bool result = batcher.MakeNextBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x01, 0x02}));
// reset writer
writer.Position = 0;
// second batch should be only the large message
result = batcher.MakeNextBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(large));
// reset writer
writer.Position = 0;
// third batch be the two small messages
result = batcher.MakeNextBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(new byte[]{0x03, 0x04}));
}
}
}