remove batching!

This commit is contained in:
mischa 2024-09-24 15:26:00 +02:00
parent 1f5693e21f
commit ade3d063b7
18 changed files with 22 additions and 987 deletions

View File

@ -1,8 +0,0 @@
fileFormatVersion: 2
guid: 1c38e1bebe9947f8b842a8a57aa2b71c
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,167 +0,0 @@
// batching functionality encapsulated into one class.
// -> less complexity
// -> easy to test
//
// IMPORTANT: we use THRESHOLD batching, not MAXED SIZE batching.
// see threshold comments below.
//
// includes timestamp for tick batching.
// -> allows NetworkTransform etc. to use timestamp without including it in
// every single message
using System;
using System.Collections.Generic;
namespace Mirror
{
public class Batcher
{
// batching threshold instead of max size.
// -> small messages are fit into threshold sized batches
// -> messages larger than threshold are single batches
//
// in other words, we fit up to 'threshold' but still allow larger ones
// for two reasons:
// 1.) data races: skipping batching for larger messages would send a
// large spawn message immediately, while others are batched and
// only flushed at the end of the frame
// 2) timestamp batching: if each batch is expected to contain a
// timestamp, then large messages have to be a batch too. otherwise
// they would not contain a timestamp
readonly int threshold;
// TimeStamp header size. each batch has one.
public const int TimestampSize = sizeof(double);
// Message header size. each message has one.
public static int MessageHeaderSize(int messageSize) =>
Compression.VarUIntSize((ulong)messageSize);
// maximum overhead for a single message.
// useful for the outside to calculate max message sizes.
public static int MaxMessageOverhead(int messageSize) =>
TimestampSize + MessageHeaderSize(messageSize);
// full batches ready to be sent.
// DO NOT queue NetworkMessage, it would box.
// DO NOT queue each serialization separately.
// it would allocate too many writers.
// https://github.com/vis2k/Mirror/pull/3127
// => best to build batches on the fly.
readonly Queue<NetworkWriterPooled> batches = new Queue<NetworkWriterPooled>();
// current batch in progress
NetworkWriterPooled batch;
public Batcher(int threshold)
{
this.threshold = threshold;
}
// add a message for batching
// we allow any sized messages.
// caller needs to make sure they are within max packet size.
public void AddMessage(ArraySegment<byte> message, double timeStamp)
{
// predict the needed size, which is varint(size) + content
int headerSize = Compression.VarUIntSize((ulong)message.Count);
int neededSize = headerSize + message.Count;
// when appending to a batch in progress, check final size.
// if it expands beyond threshold, then we should finalize it first.
// => less than or exactly threshold is fine.
// GetBatch() will finalize it.
// => see unit tests.
if (batch != null &&
batch.Position + neededSize > threshold)
{
batches.Enqueue(batch);
batch = null;
}
// initialize a new batch if necessary
if (batch == null)
{
// borrow from pool. we return it in GetBatch.
batch = NetworkWriterPool.Get();
// write timestamp first.
// -> double precision for accuracy over long periods of time
// -> batches are per-frame, it doesn't matter which message's
// timestamp we use.
batch.WriteDouble(timeStamp);
}
// add serialization to current batch. even if > threshold.
// -> we do allow > threshold sized messages as single batch
// -> WriteBytes instead of WriteSegment because the latter
// would add a size header. we want to write directly.
//
// include size prefix as varint!
// -> fixes NetworkMessage serialization mismatch corrupting the
// next message in a batch.
// -> a _lot_ of time was wasted debugging corrupt batches.
// no easy way to figure out which NetworkMessage has a mismatch.
// -> this is worth everyone's sanity.
// -> varint means we prefix with 1 byte most of the time.
// -> the same issue in NetworkIdentity was why Mirror started!
Compression.CompressVarUInt(batch, (ulong)message.Count);
batch.WriteBytes(message.Array, message.Offset, message.Count);
}
// helper function to copy a batch to writer and return it to pool
static void CopyAndReturn(NetworkWriterPooled batch, NetworkWriter writer)
{
// make sure the writer is fresh to avoid uncertain situations
if (writer.Position != 0)
throw new ArgumentException($"GetBatch needs a fresh writer!");
// copy to the target writer
ArraySegment<byte> segment = batch.ToArraySegment();
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
// return batch to pool for reuse
NetworkWriterPool.Return(batch);
}
// get the next batch which is available for sending (if any).
// TODO safely get & return a batch instead of copying to writer?
// TODO could return pooled writer & use GetBatch in a 'using' statement!
public bool GetBatch(NetworkWriter writer)
{
// get first batch from queue (if any)
if (batches.TryDequeue(out NetworkWriterPooled first))
{
CopyAndReturn(first, writer);
return true;
}
// if queue was empty, we can send the batch in progress.
if (batch != null)
{
CopyAndReturn(batch, writer);
batch = null;
return true;
}
// nothing was written
return false;
}
// return all batches to the pool for cleanup
public void Clear()
{
// return batch in progress
if (batch != null)
{
NetworkWriterPool.Return(batch);
batch = null;
}
// return all queued batches
foreach (NetworkWriterPooled queued in batches)
NetworkWriterPool.Return(queued);
batches.Clear();
}
}
}

View File

@ -1,11 +0,0 @@
fileFormatVersion: 2
guid: 0afaaa611a2142d48a07bdd03b68b2b3
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,129 +0,0 @@
// un-batching functionality encapsulated into one class.
// -> less complexity
// -> easy to test
//
// includes timestamp for tick batching.
// -> allows NetworkTransform etc. to use timestamp without including it in
// every single message
using System;
using System.Collections.Generic;
namespace Mirror
{
public class Unbatcher
{
// supporting adding multiple batches before GetNextMessage is called.
// just in case.
readonly Queue<NetworkWriterPooled> batches = new Queue<NetworkWriterPooled>();
public int BatchesCount => batches.Count;
// NetworkReader is only created once,
// then pointed to the first batch.
readonly NetworkReader reader = new NetworkReader(new byte[0]);
// timestamp that was written into the batch remotely.
// for the batch that our reader is currently pointed at.
double readerRemoteTimeStamp;
// helper function to start reading a batch.
void StartReadingBatch(NetworkWriterPooled batch)
{
// point reader to it
reader.SetBuffer(batch.ToArraySegment());
// read remote timestamp (double)
// -> AddBatch quarantees that we have at least 8 bytes to read
readerRemoteTimeStamp = reader.ReadDouble();
}
// add a new batch.
// returns true if valid.
// returns false if not, in which case the connection should be disconnected.
public bool AddBatch(ArraySegment<byte> batch)
{
// IMPORTANT: ArraySegment is only valid until returning. we copy it!
//
// NOTE: it's not possible to create empty ArraySegments, so we
// don't need to check against that.
// make sure we have at least 8 bytes to read for tick timestamp
if (batch.Count < Batcher.TimestampSize)
return false;
// put into a (pooled) writer
// -> WriteBytes instead of WriteSegment because the latter
// would add a size header. we want to write directly.
// -> will be returned to pool when sending!
NetworkWriterPooled writer = NetworkWriterPool.Get();
writer.WriteBytes(batch.Array, batch.Offset, batch.Count);
// first batch? then point reader there
if (batches.Count == 0)
StartReadingBatch(writer);
// add batch
batches.Enqueue(writer);
//Debug.Log($"Adding Batch {BitConverter.ToString(batch.Array, batch.Offset, batch.Count)} => batches={batches.Count} reader={reader}");
return true;
}
// get next message, unpacked from batch (if any)
// message ArraySegment is only valid until the next call.
// timestamp is the REMOTE time when the batch was created remotely.
public bool GetNextMessage(out ArraySegment<byte> message, out double remoteTimeStamp)
{
message = default;
remoteTimeStamp = 0;
// do nothing if we don't have any batches.
// otherwise the below queue.Dequeue() would throw an
// InvalidOperationException if operating on empty queue.
if (batches.Count == 0)
return false;
// was our reader pointed to anything yet?
if (reader.Capacity == 0)
return false;
// no more data to read?
if (reader.Remaining == 0)
{
// retire the batch
NetworkWriterPooled writer = batches.Dequeue();
NetworkWriterPool.Return(writer);
// do we have another batch?
if (batches.Count > 0)
{
// point reader to the next batch.
// we'll return the reader below.
NetworkWriterPooled next = batches.Peek();
StartReadingBatch(next);
}
// otherwise there's nothing more to read
else return false;
}
// use the current batch's remote timestamp
// AFTER potentially moving to the next batch ABOVE!
remoteTimeStamp = readerRemoteTimeStamp;
// enough data to read the size prefix?
if (reader.Remaining == 0)
return false;
// read the size prefix as varint
// see Batcher.AddMessage comments for explanation.
int size = (int)Compression.DecompressVarUInt(reader);
// validate size prefix, in case attackers send malicious data
if (reader.Remaining < size)
return false;
// return the message of size
message = reader.ReadBytesSegment(size);
return true;
}
}
}

View File

@ -1,11 +0,0 @@
fileFormatVersion: 2
guid: 328562d71e1c45c58581b958845aa7a4
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -44,21 +44,10 @@ internal override void Update()
NetworkWriterPooled writer = queue.Dequeue();
ArraySegment<byte> message = writer.ToArraySegment();
// OnTransportData assumes a proper batch with timestamp etc.
// let's make a proper batch and pass it to OnTransportData.
Batcher batcher = GetBatchForChannelId(Channels.Reliable);
batcher.AddMessage(message, NetworkTime.localTime);
using (NetworkWriterPooled batchWriter = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
if (batcher.GetBatch(batchWriter))
{
NetworkServer.OnTransportData(connectionId, batchWriter.ToArraySegment(), Channels.Reliable);
}
}
NetworkWriterPool.Return(writer);
NetworkWriter fullWriter = new NetworkWriter();
fullWriter.WriteDouble(NetworkTime.localTime); // remote timestamp
fullWriter.WriteBytes(message.Array, message.Offset, message.Count);
NetworkServer.OnTransportData(connectionId, fullWriter, Channels.Reliable);
}
}

View File

@ -57,21 +57,10 @@ internal override void Update()
NetworkWriterPooled writer = queue.Dequeue();
ArraySegment<byte> message = writer.ToArraySegment();
// OnTransportData assumes a proper batch with timestamp etc.
// let's make a proper batch and pass it to OnTransportData.
Batcher batcher = GetBatchForChannelId(Channels.Reliable);
batcher.AddMessage(message, NetworkTime.localTime);
using (NetworkWriterPooled batchWriter = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
if (batcher.GetBatch(batchWriter))
{
NetworkClient.OnTransportData(batchWriter.ToArraySegment(), Channels.Reliable);
}
}
NetworkWriterPool.Return(writer);
NetworkWriter fullWriter = new NetworkWriter();
fullWriter.WriteDouble(NetworkTime.localTime); // remote timestamp
fullWriter.WriteBytes(message.Array, message.Offset, message.Count);
NetworkClient.OnTransportData(fullWriter, Channels.Reliable);
}
// should we still process a disconnected event?

View File

@ -124,8 +124,6 @@ public static partial class NetworkClient
internal static readonly Dictionary<ulong, NetworkIdentity> spawnableObjects =
new Dictionary<ulong, NetworkIdentity>();
internal static Unbatcher unbatcher = new Unbatcher();
// interest management component (optional)
// only needed for SetHostVisibility
public static InterestManagementBase aoi;
@ -191,12 +189,6 @@ static void Initialize(bool hostMode)
// Debug.Log($"Client Connect: {address}");
Debug.Assert(Transport.active != null, "There was no active transport when calling NetworkClient.Connect, If you are calling Connect manually then make sure to set 'Transport.active' first");
// reset unbatcher in case any batches from last session remain.
// need to do this in Initialize() so it runs for the host as well.
// fixes host mode scene transition receiving data from previous scene.
// credits: BigBoxVR
unbatcher = new Unbatcher();
// reset time interpolation on every new connect.
// ensures last sessions' state is cleared before starting again.
InitTimeInterpolation();
@ -327,22 +319,9 @@ internal static void OnTransportData(ArraySegment<byte> data, int channelId)
{
if (connection != null)
{
// server might batch multiple messages into one packet.
// feed it to the Unbatcher.
// NOTE: we don't need to associate a channelId because we
// always process all messages in the batch.
if (!unbatcher.AddBatch(data))
{
if (exceptionsDisconnect)
{
Debug.LogError($"NetworkClient: failed to add batch, disconnecting.");
connection.Disconnect();
}
else
Debug.LogWarning($"NetworkClient: failed to add batch.");
return;
}
NetworkReader fullReader = new NetworkReader(data);
double remoteTimestamp = fullReader.ReadDouble();
ArraySegment<byte> message = fullReader.ReadBytesSegment(fullReader.Remaining);
// process all messages in the batch.
// only while NOT loading a scene.
@ -354,8 +333,7 @@ internal static void OnTransportData(ArraySegment<byte> data, int channelId)
// would only be processed when OnTransportData is called
// the next time.
// => consider moving processing to NetworkEarlyUpdate.
while (!isLoadingScene &&
unbatcher.GetNextMessage(out ArraySegment<byte> message, out double remoteTimestamp))
if (!isLoadingScene)
{
using (NetworkReaderPooled reader = NetworkReaderPool.Get(message))
{
@ -401,27 +379,6 @@ internal static void OnTransportData(ArraySegment<byte> data, int channelId)
}
}
}
// if we weren't interrupted by a scene change,
// then all batched messages should have been processed now.
// if not, we need to log an error to avoid debugging hell.
// otherwise batches would silently grow.
// we need to log an error to avoid debugging hell.
//
// EXAMPLE: https://github.com/vis2k/Mirror/issues/2882
// -> UnpackAndInvoke silently returned because no handler for id
// -> Reader would never be read past the end
// -> Batch would never be retired because end is never reached
//
// NOTE: prefixing every message in a batch with a length would
// avoid ever not reading to the end. for extra bandwidth.
//
// IMPORTANT: always keep this check to detect memory leaks.
// this took half a day to debug last time.
if (!isLoadingScene && unbatcher.BatchesCount > 0)
{
Debug.LogError($"Still had {unbatcher.BatchesCount} batches remaining after processing, even though processing was not interrupted by a scene change. This should never happen, as it would cause ever growing batches.\nPossible reasons:\n* A message didn't deserialize as much as it serialized\n*There was no message handler for a message id, so the reader wasn't read until the end.");
}
}
else Debug.LogError("Skipped Data message handling because connection is null.");
}
@ -1962,8 +1919,6 @@ public static void Shutdown()
isLoadingScene = false;
lastSendTime = 0;
unbatcher = new Unbatcher();
// clear events. someone might have hooked into them before, but
// we don't want to use those hooks after Shutdown anymore.
OnConnectedEvent = null;

View File

@ -41,17 +41,6 @@ public abstract class NetworkConnection
// Works fine with NetworkIdentity pointers though.
public readonly HashSet<NetworkIdentity> owned = new HashSet<NetworkIdentity>();
// batching from server to client & client to server.
// fewer transport calls give us significantly better performance/scale.
//
// for a 64KB max message transport and 64 bytes/message on average, we
// reduce transport calls by a factor of 1000.
//
// depending on the transport, this can give 10x performance.
//
// Dictionary<channelId, batch> because we have multiple channels.
protected Dictionary<int, Batcher> batches = new Dictionary<int, Batcher>();
/// <summary>last batch's remote timestamp. not interpolated. useful for NetworkTransform etc.</summary>
// for any given NetworkMessage/Rpc/Cmd/OnSerialize, this was the time
// on the REMOTE END when it was sent.
@ -73,24 +62,6 @@ internal NetworkConnection(int networkConnectionId) : this()
connectionId = networkConnectionId;
}
// TODO if we only have Reliable/Unreliable, then we could initialize
// two batches and avoid this code
protected Batcher GetBatchForChannelId(int channelId)
{
// get existing or create new writer for the channelId
Batcher batch;
if (!batches.TryGetValue(channelId, out batch))
{
// get max batch size for this channel
int threshold = Transport.active.GetBatchThreshold(channelId);
// create batcher
batch = new Batcher(threshold);
batches[channelId] = batch;
}
return batch;
}
// Send stage one: NetworkMessage<T>
/// <summary>Send a NetworkMessage to this connection over the given channel.</summary>
public void Send<T>(T message, int channelId = Channels.Reliable)
@ -144,7 +115,11 @@ internal virtual void Send(ArraySegment<byte> segment, int channelId = Channels.
//
// NOTE: we do NOT ValidatePacketSize here yet. the final packet
// will be the full batch, including timestamp.
GetBatchForChannelId(channelId).AddMessage(segment, NetworkTime.localTime);
NetworkWriter fullWriter = new NetworkWriter();
fullWriter.WriteDouble(NetworkTime.localTime); // remote timestamp
fullWriter.WriteBytes(segment.Array, segment.Offset, segment.Count);
SendToTransport(fullWriter, channelId);
}
// Send stage three: hand off to transport
@ -153,30 +128,6 @@ internal virtual void Send(ArraySegment<byte> segment, int channelId = Channels.
// flush batched messages at the end of every Update.
internal virtual void Update()
{
// go through batches for all channels
// foreach ((int key, Batcher batcher) in batches) // Unity 2020 doesn't support deconstruct yet
foreach (KeyValuePair<int, Batcher> kvp in batches)
{
// make and send as many batches as necessary from the stored
// messages.
using (NetworkWriterPooled writer = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
while (kvp.Value.GetBatch(writer))
{
// message size is validated in Send<T>, with test coverage.
// we can send directly without checking again.
ArraySegment<byte> segment = writer.ToArraySegment();
// send to transport
SendToTransport(segment, kvp.Key);
//UnityEngine.Debug.Log($"sending batch of {writer.Position} bytes for channel={kvp.Key} connId={connectionId}");
// reset writer for each new batch
writer.Position = 0;
}
}
}
}
/// <summary>Check if we received a message within the last 'timeout' seconds.</summary>
@ -208,10 +159,6 @@ internal virtual void Update()
// never be returned to the pool.
public virtual void Cleanup()
{
foreach (Batcher batcher in batches.Values)
{
batcher.Clear();
}
}
public override string ToString() => $"connection({connectionId})";

View File

@ -20,9 +20,6 @@ public class NetworkConnectionToClient : NetworkConnection
// TODO move to server's NetworkConnectionToClient?
public readonly HashSet<NetworkIdentity> observing = new HashSet<NetworkIdentity>();
// unbatcher
public Unbatcher unbatcher = new Unbatcher();
// server runs a time snapshot interpolation for each client's local time.
// this is necessary for client auth movement to still be smooth on the
// server for host mode.

View File

@ -65,7 +65,7 @@ public static int MaxContentSize(int channelId)
{
// calculate the max possible size that can fit in a batch
int transportMax = Transport.active.GetMaxPacketSize(channelId);
return transportMax - IdSize - Batcher.MaxMessageOverhead(transportMax);
return transportMax - IdSize - 8; // - remoteTimestamp
}
// max message size which includes header + content.

View File

@ -898,22 +898,9 @@ internal static void OnTransportData(int connectionId, ArraySegment<byte> data,
{
if (connections.TryGetValue(connectionId, out NetworkConnectionToClient connection))
{
// client might batch multiple messages into one packet.
// feed it to the Unbatcher.
// NOTE: we don't need to associate a channelId because we
// always process all messages in the batch.
if (!connection.unbatcher.AddBatch(data))
{
if (exceptionsDisconnect)
{
Debug.LogError($"NetworkServer: received message from connectionId:{connectionId} was too short (messages should start with message id). Disconnecting.");
connection.Disconnect();
}
else
Debug.LogWarning($"NetworkServer: received message from connectionId:{connectionId} was too short (messages should start with message id).");
return;
}
NetworkReader fullReader = new NetworkReader(data);
double remoteTimestamp = fullReader.ReadDouble();
ArraySegment<byte> message = fullReader.ReadBytesSegment(fullReader.Remaining);
// process all messages in the batch.
// only while NOT loading a scene.
@ -925,8 +912,7 @@ internal static void OnTransportData(int connectionId, ArraySegment<byte> data,
// would only be processed when OnTransportData is called
// the next time.
// => consider moving processing to NetworkEarlyUpdate.
while (!isLoadingScene &&
connection.unbatcher.GetNextMessage(out ArraySegment<byte> message, out double remoteTimestamp))
if (!isLoadingScene)
{
using (NetworkReaderPooled reader = NetworkReaderPool.Get(message))
{
@ -973,26 +959,6 @@ internal static void OnTransportData(int connectionId, ArraySegment<byte> data,
}
}
}
// if we weren't interrupted by a scene change,
// then all batched messages should have been processed now.
// otherwise batches would silently grow.
// we need to log an error to avoid debugging hell.
//
// EXAMPLE: https://github.com/vis2k/Mirror/issues/2882
// -> UnpackAndInvoke silently returned because no handler for id
// -> Reader would never be read past the end
// -> Batch would never be retired because end is never reached
//
// NOTE: prefixing every message in a batch with a length would
// avoid ever not reading to the end. for extra bandwidth.
//
// IMPORTANT: always keep this check to detect memory leaks.
// this took half a day to debug last time.
if (!isLoadingScene && connection.unbatcher.BatchesCount > 0)
{
Debug.LogError($"Still had {connection.unbatcher.BatchesCount} batches remaining after processing, even though processing was not interrupted by a scene change. This should never happen, as it would cause ever growing batches.\nPossible reasons:\n* A message didn't deserialize as much as it serialized\n*There was no message handler for a message id, so the reader wasn't read until the end.");
}
}
else Debug.LogError($"HandleData Unknown connectionId:{connectionId}");
}

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 6d113040314b4578b93a6f140f064f09
timeCreated: 1623240703

View File

@ -1,304 +0,0 @@
using System;
using System.Linq;
using NUnit.Framework;
namespace Mirror.Tests.Batching
{
public class BatcherTests
{
Batcher batcher;
NetworkWriter writer;
// threshold to test batcher with multiple batches.
// each batch can be 8 bytes timestamp + 8 bytes data
const int Threshold = 8 + 6;
// timestamp and serialized timestamp for convenience
const double TimeStamp = Math.PI;
[SetUp]
public void SetUp()
{
batcher = new Batcher(Threshold);
writer = new NetworkWriter();
}
// helper function to create a batch prefixed by timestamp
public static byte[] MakeBatch(double tickTimeStamp, byte[] message)
{
NetworkWriter writer = new NetworkWriter();
writer.WriteDouble(tickTimeStamp);
Compression.CompressVarUInt(writer, (ulong)message.Length);
writer.WriteBytes(message, 0, message.Length);
return writer.ToArray();
}
public static byte[] MakeBatch(double tickTimeStamp, byte[] messageA, byte[] messageB)
{
NetworkWriter writer = new NetworkWriter();
writer.WriteDouble(tickTimeStamp);
Compression.CompressVarUInt(writer, (ulong)messageA.Length);
writer.WriteBytes(messageA, 0, messageA.Length);
Compression.CompressVarUInt(writer, (ulong)messageB.Length);
writer.WriteBytes(messageB, 0, messageB.Length);
return writer.ToArray();
}
[Test]
public void AddMessage()
{
byte[] message = {0x01, 0x02};
batcher.AddMessage(new ArraySegment<byte>(message), TimeStamp);
}
[Test]
public void MakeNextBatch_OnlyAcceptsFreshWriter()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
writer.WriteByte(0);
Assert.Throws<ArgumentException>(() => {
batcher.GetBatch(writer);
});
}
[Test]
public void MakeNextBatch_NoMessage()
{
// make batch with no message
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(false));
}
[Test]
public void MakeNextBatch_OneMessage()
{
// add message
byte[] message = {0x01, 0x02};
batcher.AddMessage(new ArraySegment<byte>(message), TimeStamp);
// make batch
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, message)));
}
[Test]
public void MakeNextBatch_MultipleMessages_AlmostFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}), TimeStamp);
// make batch
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x01, 0x02}, new byte[]{0x03})));
// there should be no more batches to make
Assert.That(batcher.GetBatch(writer), Is.False);
}
[Test]
public void MakeNextBatch_MultipleMessages_ExactlyFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}), TimeStamp);
// make batch
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x01, 0x02}, new byte[]{0x03, 0x04})));
// there should be no more batches to make
Assert.That(batcher.GetBatch(writer), Is.False);
}
[Test]
public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
{
// with header, that's 3 bytes per message = 8 bytes = over threshold
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x05}), TimeStamp);
// first batch
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x01, 0x02}, new byte[]{0x03, 0x04})));
// reset writer
writer.Position = 0;
// second batch
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x05})));
}
[Test]
public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
{
// small, too big to include in batch, small
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02, 0x03, 0x04, 0x05}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x06, 0x07}), TimeStamp);
// first batch
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x01})));
// reset writer
writer.Position = 0;
// second batch
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x02, 0x03, 0x04, 0x05})));
// reset writer
writer.Position = 0;
// third batch
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x06, 0x07})));
}
// messages > threshold should simply be single batches.
// those need to be supported too, for example:
// kcp prefers MTU sized batches
// but we still allow up to 144 KB max message size
[Test]
public void MakeNextBatch_LargerThanThreshold()
{
// make a larger than threshold message
byte[] large = new byte[Threshold + 1];
for (int i = 0; i < Threshold + 1; ++i)
large[i] = (byte)i;
batcher.AddMessage(new ArraySegment<byte>(large), TimeStamp);
// result should be only the large message
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, large)));
}
// messages > threshold should simply be single batches.
// those need to be supported too, for example:
// kcp prefers MTU sized batches
// but we still allow up to 144 KB max message size
[Test]
public void MakeNextBatch_LargerThanThreshold_BetweenSmallerMessages()
{
// make a larger than threshold message
byte[] large = new byte[Threshold + 1];
for (int i = 0; i < Threshold + 1; ++i)
large[i] = (byte)i;
// add two small, one large, two small messages.
// to make sure everything around it is still batched,
// and the large one is a separate batch.
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(large), TimeStamp + 1);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}), TimeStamp + 2);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x04}), TimeStamp + 2);
// first batch should be the two small messages with size headers
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp, new byte[]{0x01}, new byte[]{0x02})));
// reset writer
writer.Position = 0;
// second batch should be only the large message
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp + 1, large)));
// reset writer
writer.Position = 0;
// third batch be the two small messages
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(MakeBatch(TimeStamp + 2, new byte[]{0x03}, new byte[]{0x04})));
}
// if a batch contains ABC,
// and unbatching only deserializes half of B,
// then C will end up corrupted,
// and nothing will indicate which message caused it.
// days & weeks were lost on this.
[Test]
public void MessageSerializationMismatch()
{
// batch with correct size
batcher.AddMessage(new ArraySegment<byte>(new byte[]{1}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{2}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{3}), TimeStamp);
Assert.That(batcher.GetBatch(writer), Is.True);
// feed batch to unbatcher
Unbatcher unbatcher = new Unbatcher();
unbatcher.AddBatch(writer);
// read A correctly
Assert.That(unbatcher.GetNextMessage(out ArraySegment<byte> message, out _), Is.True);
NetworkReader reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(1));
// read B only partially.
// this can happen if a NetworkMessage does custom serialization,
// and does early return in Deserialize.
// for example, SmoothSync.
Assert.That(unbatcher.GetNextMessage(out message, out _), Is.True);
// reader = new NetworkReader(message);
// Assert.That(reader.ReadByte(), Is.EqualTo(2));
// read C. this will be corrupted
Assert.That(unbatcher.GetNextMessage(out message, out _), Is.True);
reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(3));
}
[Test]
public void ClearReturnsToPool()
{
int previousCount = NetworkWriterPool.Count;
// add a few messages
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}), TimeStamp);
Assert.That(NetworkWriterPool.Count, Is.LessThan(previousCount));
// clear
batcher.Clear();
Assert.That(NetworkWriterPool.Count, Is.EqualTo(previousCount));
}
}
}

View File

@ -1,11 +0,0 @@
fileFormatVersion: 2
guid: 787d83b7e2ca4547aca251617d91f7d8
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,137 +0,0 @@
using System;
using NUnit.Framework;
namespace Mirror.Tests.Batching
{
public class UnbatcherTests
{
Unbatcher unbatcher;
const double TimeStamp = Math.PI;
[SetUp]
public void SetUp()
{
unbatcher = new Unbatcher();
}
[Test]
public void GetNextMessage_NoBatches()
{
bool result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
// test for nimoyd bug, where calling getnextmessage after the previous
// call already returned false would cause an InvalidOperationException.
[Test]
public void GetNextMessage_True_False_False_InvalidOperationException()
{
// add batch
byte[] batch = BatcherTests.MakeBatch(TimeStamp, new byte[2]);
unbatcher.AddBatch(new ArraySegment<byte>(batch));
// get next message, pretend we read the whole thing
bool result = unbatcher.GetNextMessage(out ArraySegment<byte> message, out _);
Assert.That(result, Is.True);
// shouldn't get another one
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
// calling it again was causing "InvalidOperationException: Queue empty"
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
[Test]
public void GetNextMessage_OneBatch()
{
// add one batch
byte[] batch = BatcherTests.MakeBatch(TimeStamp, new byte[] {0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(batch));
// get next message
bool result = unbatcher.GetNextMessage(out ArraySegment<byte> message, out double remoteTimeStamp);
NetworkReader reader = new NetworkReader(message);
Assert.That(result, Is.True);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// there should be no more messages
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
[Test]
public void GetNextMessage_MultipleBatches()
{
// add first batch
byte[] firstBatch = BatcherTests.MakeBatch(TimeStamp, new byte[] {0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(firstBatch));
// add second batch
byte[] secondBatch = BatcherTests.MakeBatch(TimeStamp + 1, new byte[] {0x03, 0x04});
unbatcher.AddBatch(new ArraySegment<byte>(secondBatch));
// get next message, read everything
bool result = unbatcher.GetNextMessage(out ArraySegment<byte> message, out double remoteTimeStamp);
Assert.That(result, Is.True);
NetworkReader reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// get next message, should point to next batch at Timestamp + 1
result = unbatcher.GetNextMessage(out message, out remoteTimeStamp);
Assert.That(result, Is.True);
reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(0x03));
Assert.That(reader.ReadByte(), Is.EqualTo(0x04));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp + 1));
// there should be no more messages
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
}
// make sure that retiring a batch, then adding a new batch works.
// previously there was a bug where the batch was retired,
// the reader still pointed to the old batch with pos=len,
// a new batch was added
// GetNextMessage() still returned false because reader still pointed to
// the old batch with pos=len.
[Test]
public void RetireBatchAndTryNewBatch()
{
// add first batch
byte[] firstBatch = BatcherTests.MakeBatch(TimeStamp, new byte[] {0x01, 0x02});
unbatcher.AddBatch(new ArraySegment<byte>(firstBatch));
// read everything
bool result = unbatcher.GetNextMessage(out ArraySegment<byte> message, out double remoteTimeStamp);
Assert.That(result, Is.True);
NetworkReader reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(0x01));
Assert.That(reader.ReadByte(), Is.EqualTo(0x02));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp));
// try to read again.
// reader will be at limit, which should retire the batch.
result = unbatcher.GetNextMessage(out _, out _);
Assert.That(result, Is.False);
// add new batch
byte[] secondBatch = BatcherTests.MakeBatch(TimeStamp + 1, new byte[] {0x03, 0x04});
unbatcher.AddBatch(new ArraySegment<byte>(secondBatch));
// read everything
result = unbatcher.GetNextMessage(out message, out remoteTimeStamp);
Assert.That(result, Is.True);
reader = new NetworkReader(message);
Assert.That(reader.ReadByte(), Is.EqualTo(0x03));
Assert.That(reader.ReadByte(), Is.EqualTo(0x04));
Assert.That(remoteTimeStamp, Is.EqualTo(TimeStamp + 1));
}
}
}

View File

@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: ccc928bb22f5469886cef8c6132aa717
timeCreated: 1623240730

View File

@ -148,29 +148,5 @@ public void ShutdownCleanup()
Assert.That(NetworkClient.OnErrorEvent, Is.Null);
Assert.That(NetworkClient.OnTransportExceptionEvent, Is.Null);
}
// test to prevent a bug where host mode scene transitions would
// still receive a previous scene's data.
[Test]
public void ConnectHostResetsUnbatcher()
{
// listen & connect host
NetworkServer.Listen(1);
ConnectHostClientBlockingAuthenticatedAndReady();
// add some data to unbatcher, disconnect.
// need at least batcher.HeaderSize for it to be counted as batch
NetworkClient.isLoadingScene = true;
byte[] data = new byte[]{1,2,3,4,5,6,7,8};
NetworkClient.OnTransportData(new ArraySegment<byte>(data), Channels.Reliable);
NetworkClient.Disconnect();
NetworkServer.DisconnectAll();
Assert.That(NetworkClient.unbatcher.BatchesCount, Is.EqualTo(1));
// batches should be cleared when connecting again.
// otherwise we would get invalid messages from last time.
ConnectHostClientBlockingAuthenticatedAndReady();
Assert.That(NetworkClient.unbatcher.BatchesCount, Is.EqualTo(0));
}
}
}