InputAccumulator (Reliable)

This commit is contained in:
mischa 2023-08-31 11:04:14 +02:00
parent ff56210a36
commit fd15eb7794
8 changed files with 528 additions and 0 deletions

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: 82c86372b0b95431398582e3a0095370
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,72 @@
using System;
namespace Mirror
{
public struct ClientInput
{
// inputs need a unique ID for acknowledgement from the server.
// TODO this can be optimized later, for now let's stay with uint.
public uint inputId;
// TODO this definitely needs to be optimized later. maybe with hash?
public string input;
// serialized parameters for the input, i.e. position.
// TODO NONALLOC/POOLED
public NetworkWriter parameters;
public double timestamp;
// UNRELIABLE:
// keep track of how many times we attempted to send this input unreliably
// this is useful to detect issues.
// public int sendAttempts;
// UNRELIABLE:
// inputs are sent over unreliable.
// server will tell us when it received an input with inputId.
// in that case, set acked and don't retransmit.
// public bool acked;
public ClientInput(uint inputId, string input, NetworkWriter parameters, double timestamp)
{
this.inputId = inputId;
this.input = input;
this.parameters = parameters;
this.timestamp = timestamp;
// UNRELIABLE:
// this.sendAttempts = 0;
// this.acked = false;
}
}
// add NetworkReader/Writer extensions for ClientInput type
public static class InputAccumulatorSerialization
{
public static void WriteClientInput(this NetworkWriter writer, ClientInput input)
{
writer.WriteUInt(input.inputId);
writer.WriteString(input.input);
writer.WriteArraySegmentAndSize(input.parameters);
writer.WriteDouble(input.timestamp);
}
public static ClientInput ReadClientInput(this NetworkReader reader)
{
uint inputId = reader.ReadUInt();
string input = reader.ReadString();
ArraySegment<byte> parameters = reader.ReadArraySegmentAndSize();
double timestamp = reader.ReadDouble();
// wrap parameter bytes in a writer.
// if there were no parameters, 'parameters' is default/null.
// in that case, don't copy anything otherwise we get a nullref.
NetworkWriter writer = new NetworkWriter();
if (parameters.Array != null)
{
writer.WriteBytes(parameters.Array, parameters.Offset, parameters.Count);
}
return new ClientInput(inputId, input, writer, timestamp);
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 946abd35216042cd91d557c096a1a397
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,234 @@
// Input Accumulator for prediction.
//
// everything goes over reliable channel for now: make it work, then make it fast!
//
// in the future we want to send client inputs to the server immediately over
// unreliable. some will get lost, so we always want to send the last N inputs
// at once.
//
// based on Overwatch GDC talk: https://www.youtube.com/watch?v=zrIY0eIyqmI
//
// usage:
// - inherit and customize this for your player
// - add the component to the player prefab
// - channel all inputs through this component
// for example, when firing call GetComponent<InputAccumulator>().Fire()
using System;
using System.Collections.Generic;
using UnityEngine;
namespace Mirror
{
public abstract class InputAccumulator : NetworkBehaviour
{
uint nextInputId = 1;
// history limit as byte to enforce max 255 (which saves bandwidth)
[Tooltip("Keep this many inputs to sync to server in a batch, and to apply reconciliation.\nDon't change this at runtime!")]
public byte historyLimit = 64;
// UNRELIABLE:
// [Tooltip("How many times the client will attempt to (unreliably) send an input before giving up.")]
// public int attemptLimit = 16;
// input history with both acknowledged and unacknowledged inputs.
// => unacknowledged inputs are still being resent
// => acknowledged are kept for later in case of reconciliation
internal readonly Queue<ClientInput> history = new Queue<ClientInput>();
double lastSendTime;
// record input by name, i.e. "Fire".
// make sure to use const strings like "Fire" to avoid allocations.
// "Fire{i}" would allocate.
// returns true if there was space in history, false otherwise.
// if it returns false, it's best not to apply the player input to the world.
protected bool RecordInput(string inputName, NetworkWriter parameters)
{
// keep history limit
if (history.Count >= historyLimit)
{
// the oldest entry is only safe to dequeue if the server acked it.
// otherwise the server wouldn't never receive & apply it.
// if (!history.Peek().acked)
// {
// // best to warn and drop it.
// Debug.LogWarning($"Input {inputName} on {name} with netId={netId} will be dropped because history is full and the oldest input hasn't been acknowledged yet.");
// return false;
// }
history.Dequeue();
}
// record it with a new input id
ClientInput input = new ClientInput(nextInputId, inputName, parameters, NetworkTime.time);
nextInputId += 1;
history.Enqueue(input);
// send it to the server over reliable for now.
// in the future, N inputs will be squashed into one unreliable message.
using (NetworkWriterPooled writer = NetworkWriterPool.Get())
{
writer.Write(input);
CmdSendInput(writer);
}
return true;
}
// called on server after receiving a new client input.
// TODO pass batch remoteTime? server knows this but may be easier to pass here too.
protected abstract void ApplyInputOnServer(string inputName);
[Command]
void CmdSendInput(ArraySegment<byte> serializedInput)
{
// deserialize input
using (NetworkReaderPooled reader = NetworkReaderPool.Get(serializedInput))
{
ClientInput input = reader.ReadClientInput();
// UNRELIABLE
// send ack message to client.
// at the moment this is sent reliabily.
// TODO keep history of acked so we don't send twice?
// client may still send it a few times before it gets ack.
// TargetSendInputAck(input.inputId);
// process the input on server
ApplyInputOnServer(input.input);
}
}
// UNRELIABLE
/*
// squash all history inputs and send them to the server in one unreliable message
protected void Flush()
{
using (NetworkWriterPooled writer = NetworkWriterPool.Get())
{
// write each client input that the server hasn't acknowledged yet.
// we are also flagging each input's send attempts.
// which means that we need to iterate with dequeue/enqueue again.
// foreach can't modify while iterating.
// foreach (ClientInput input in history)
// {
// if (!input.acked)
// writer.WriteClientInput(input);
// }
int count = history.Count;
for (int i = 0; i < count; ++i)
{
ClientInput input = history.Dequeue();
if (!input.acked)
{
writer.WriteClientInput(input);
input.sendAttempts += 1;
// give up after too many attempts
if (input.sendAttempts > attemptLimit)
{
// TODO maybe this should disconnect?
Debug.LogWarning($"Input {input.input} on {name} with netId={netId} will be dropped because it was sent {input.sendAttempts} times and never acknowledged by the server.");
continue; // continue to the next, don't Enqueue
}
}
history.Enqueue(input);
}
CmdSendInputBatch(writer.ToArraySegment());
}
}
bool IsNewInputId(uint inputId)
{
// TODO faster
foreach (ClientInput input in history)
{
if (input.inputId == inputId)
return false;
}
return true;
}
// prediction should send input immediately over unreliable.
// latency is key.
// we batch together the last N inputs to make up for unreliable loss.
// [Command(channel = Channels.Unreliable)]
void CmdSendInputBatch(ArraySegment<byte> inputBatch)
{
// deserialize inputs
using (NetworkReaderPooled reader = NetworkReaderPool.Get(inputBatch))
{
// read each client input
while (reader.Remaining > 0)
{
ClientInput input = reader.ReadClientInput();
// UDP messages may arrive twice.
// only process and apply the same input once though!
if (IsNewInputId(input.inputId))
{
// TODO for unreliable, we need to ensure inputs are applied in same order!
// send ack message to client.
// at the moment this is sent reliabily.
// TODO keep history of acked so we don't send twice?
// client may still send it a few times before it gets ack.
TargetSendInputAck(input.inputId);
// process the input on server
ApplyInputOnServer(input.input);
}
}
}
}
// acknowledge an inputId, which flags it as acked on the client.
// client will then stop sending it to the server.
// standalone function (not rpc) for easier testing.
// TODO batch & optimize to minimize bandwidth later
internal void AcknowledgeInput(uint inputId)
{
// we can't modify Queue elements while iterating.
// we'll have to deqeueue + enqueue each of them once for now.
// TODO faster lookup?
int count = history.Count;
for (int i = 0; i < count; ++i)
{
ClientInput input = history.Dequeue();
if (input.inputId == inputId)
{
// flag as acked, but keep in history for reconciliation later
input.acked = true;
}
history.Enqueue(input);
}
}
// server sends acknowledgements for received inputs to client
// TODO unreliable? this isn't latency sensitive though.
// with reliable, at least we can guarantee it's gonna be delivered
[TargetRpc]
void TargetSendInputAck(uint inputId) => AcknowledgeInput(inputId);
void Update()
{
if (isLocalPlayer)
{
// UNRELIABLE
// TODO we don't have OnSerializeUnreliable yet.
// send manually for now
// if (NetworkTime.time >= lastSendTime + syncInterval)
// {
// Flush();
// lastSendTime = NetworkTime.time;
// }
}
}
*/
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: dc1d73b5baeb84a84abe9f06e8fa3fd0
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 3dbcea55c1a74b4bb1d690db23aeae2a
timeCreated: 1692350915

View File

@ -0,0 +1,186 @@
using System.Collections.Generic;
using NUnit.Framework;
using UnityEngine;
namespace Mirror.Tests.Prediction
{
class MockInputAccumulator : InputAccumulator
{
public List<string> serverInputs = new List<string>();
// expose protected functions for testing
public new bool RecordInput(string inputName, NetworkWriter parameters)
=> base.RecordInput(inputName, parameters);
// public new void Flush()
// => base.Flush();
protected override void ApplyInputOnServer(string inputName)
=> serverInputs.Add(inputName);
}
public class InputAccumulatorTests : MirrorTest
{
MockInputAccumulator serverComp;
MockInputAccumulator clientComp;
const int Limit = 4;
[SetUp]
public override void SetUp()
{
base.SetUp();
NetworkServer.Listen(1);
ConnectClientBlockingAuthenticatedAndReady(out NetworkConnectionToClient connectionToClient);
CreateNetworkedAndSpawnPlayer(
out _, out _, out serverComp,
out _, out _, out clientComp,
connectionToClient);
serverComp.historyLimit = Limit;
clientComp.historyLimit = Limit;
}
[TearDown]
public override void TearDown()
{
base.TearDown();
}
[Test]
public void InputAccumulatorSerialization_WithoutParameters()
{
// write an input without any parameters
NetworkWriter parameters = new NetworkWriter();
NetworkWriter writer = new NetworkWriter();
ClientInput input = new ClientInput(42, "Fire", parameters, 2.0);
writer.WriteClientInput(input);
// read
NetworkReader reader = new NetworkReader(writer);
ClientInput result = reader.ReadClientInput();
// check
Assert.That(result.inputId, Is.EqualTo(input.inputId));
Assert.That(result.input, Is.EqualTo(input.input));
Assert.That(result.parameters.Position, Is.EqualTo(0));
Assert.That(result.timestamp, Is.EqualTo(input.timestamp));
}
[Test]
public void InputAccumulatorSerialization_WithParameters()
{
// write an input with a few parameters
NetworkWriter parameters = new NetworkWriter();
parameters.WriteVector3(new Vector3(1, 2, 3));
NetworkWriter writer = new NetworkWriter();
ClientInput input = new ClientInput(42, "Fire", parameters, 2.0);
writer.WriteClientInput(input);
// read
NetworkReader reader = new NetworkReader(writer);
ClientInput result = reader.ReadClientInput();
// check
Assert.That(result.inputId, Is.EqualTo(input.inputId));
Assert.That(result.input, Is.EqualTo(input.input));
NetworkReader parametersReader = new NetworkReader(input.parameters);
Assert.That(parametersReader.Remaining, Is.EqualTo(4 * 3)); // sizeof(Vector3)
Assert.That(parametersReader.ReadVector3(), Is.EqualTo(new Vector3(1, 2, 3)));
Assert.That(result.timestamp, Is.EqualTo(input.timestamp));
}
[Test]
public void Record()
{
// record a few
Assert.That(clientComp.RecordInput("Fire", new NetworkWriter()), Is.True);
Assert.That(clientComp.RecordInput("Jump", new NetworkWriter()), Is.True);
ClientInput[] history = clientComp.history.ToArray();
Assert.That(history.Length, Is.EqualTo(2));
Assert.That(history[0].input, Is.EqualTo("Fire"));
Assert.That(history[0].inputId, Is.EqualTo(1));
Assert.That(history[1].input, Is.EqualTo("Jump"));
Assert.That(history[1].inputId, Is.EqualTo(2));
}
/*
// recording more inputs than 'limit' should drop input if not acked.
[Test]
public void RecordOverLimit_Unacked()
{
// fill to the limit
for (int i = 0; i < Limit; ++i)
Assert.That(clientComp.RecordInput($"Fire{i}", new NetworkWriter()), Is.True);
// try to record another while the oldest is still unacknowledged.
// input should be dropped, not inserted.
Assert.That(clientComp.RecordInput("Extra", new NetworkWriter()), Is.False);
ClientInput[] history = clientComp.history.ToArray();
Assert.That(history.Length, Is.EqualTo(Limit));
Assert.That(history[0].input, Is.EqualTo("Fire0"));
}
// recording more inputs than 'limit' should drop the oldest (if acked).
[Test]
public void RecordOverLimit_Acked()
{
// fill to the limit
for (int i = 0; i < Limit; ++i)
Assert.That(clientComp.RecordInput($"Fire{i}", new NetworkWriter()), Is.True);
// acknowledge the oldest
uint oldestId = clientComp.history.Peek().inputId;
clientComp.AcknowledgeInput(oldestId);
// try to record another while the oldest is acknowledged.
// input should be inserted, and oldest dropped.
Assert.That(clientComp.RecordInput("Extra", new NetworkWriter()), Is.True);
ClientInput[] history = clientComp.history.ToArray();
Assert.That(history.Length, Is.EqualTo(Limit));
Assert.That(history[0].input, Is.EqualTo("Fire1")); // first one is gone
}
[Test]
public void MaxResendAttempts()
{
// insert a few
Assert.That(clientComp.RecordInput("Fire", new NetworkWriter()), Is.True);
Assert.That(clientComp.RecordInput("Jump", new NetworkWriter()), Is.True);
ClientInput[] history = clientComp.history.ToArray();
Assert.That(history.Length, Is.EqualTo(2));
// set one of them to max resends
// can't access queue [i] directly, use dequeue+enqueue instead
ClientInput oldest = clientComp.history.Dequeue();
oldest.sendAttempts = clientComp.attemptLimit;
clientComp.history.Enqueue(oldest);
// flush should remove the one with too many attempts
clientComp.Flush();
history = clientComp.history.ToArray();
Assert.That(history.Length, Is.EqualTo(1));
Assert.That(history[0].input, Is.EqualTo("Jump"));
}
*/
[Test]
public void ClientInputGetSyncedToServer()
{
// insert a few
Assert.That(clientComp.RecordInput("Fire", new NetworkWriter()), Is.True);
Assert.That(clientComp.RecordInput("Jump", new NetworkWriter()), Is.True);
// flush to server
// clientComp.Flush();
ProcessMessages();
// server should've received the inputs
// note there's a small chance for unreliable messages to
// get dropped or arrive out of order, even on localhost.
Assert.That(serverComp.serverInputs.Count, Is.EqualTo(2));
Assert.That(serverComp.serverInputs.Contains("Fire")); // UDP order isn't guaranteed
Assert.That(serverComp.serverInputs.Contains("Jump")); // UDP order isn't guaranteed
}
}
}

View File

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 3c0ad56084da4aacb0b0e5366ac48f31
timeCreated: 1692350921