Basic Telepathy integration

This commit is contained in:
vis2k 2018-08-14 11:13:02 +02:00
parent d3b537fc06
commit 699c379c3c
23 changed files with 1160 additions and 357 deletions

View File

@ -66,11 +66,22 @@
<Compile Include="..\Runtime\NetworkManagerHUD.cs" />
<Compile Include="..\Runtime\NetworkBehaviour.cs" />
<Compile Include="..\Runtime\NetworkIdentity.cs" />
<Compile Include="..\Runtime\Transport.cs" />
<Compile Include="..\Runtime\UNetwork.cs" />
<Compile Include="..\Runtime\NetworkReader.cs" />
<Compile Include="..\Runtime\NetworkServer.cs" />
<Compile Include="..\Runtime\SyncList.cs" />
<Compile Include="..\Runtime\NetworkWriter.cs" />
<Compile Include="..\Runtime\Telepathy\Client.cs" />
<Compile Include="..\Runtime\Telepathy\Common.cs" />
<Compile Include="..\Runtime\Telepathy\EventType.cs" />
<Compile Include="..\Runtime\Telepathy\Logger.cs" />
<Compile Include="..\Runtime\Telepathy\Message.cs" />
<Compile Include="..\Runtime\Telepathy\NetworkStreamExtensions.cs" />
<Compile Include="..\Runtime\Telepathy\SafeCounter.cs" />
<Compile Include="..\Runtime\Telepathy\SafeDictionary.cs" />
<Compile Include="..\Runtime\Telepathy\SafeQueue.cs" />
<Compile Include="..\Runtime\Telepathy\Server.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Target Name="AfterBuild">

View File

@ -46,8 +46,8 @@ internal static void Shutdown()
s_IsReady = false;
s_IsSpawnFinished = false;
NetworkTransport.Shutdown();
NetworkTransport.Init();
Debug.Log("ClientScene.Shutdown calls telepathyClient.Disconnect");
Transport.client.Disconnect();
}
// this is called from message handler for Owner message

View File

@ -26,7 +26,7 @@ public override void Disconnect()
PostInternalMessage((short)MsgType.Disconnect);
m_Connected = false;
}
m_AsyncConnect = ConnectState.Disconnected;
connectState = ConnectState.Disconnected;
NetworkServer.RemoveLocalClient(m_Connection);
}
@ -35,7 +35,7 @@ internal void InternalConnectLocalServer(bool generateConnectMsg)
m_Connection = new ULocalConnectionToServer();
SetHandlers(m_Connection);
m_Connection.connectionId = NetworkServer.AddLocalClient(this);
m_AsyncConnect = ConnectState.Connected;
connectState = ConnectState.Connected;
SetActive(true);
RegisterSystemHandlers(true);

View File

@ -11,8 +11,6 @@ public class NetworkClient
{
Type m_NetworkConnectionClass = typeof(NetworkConnection);
const int k_MaxEventsPerFrame = 500;
static List<NetworkClient> s_Clients = new List<NetworkClient>();
static bool s_IsActive;
@ -32,20 +30,14 @@ public class NetworkClient
Dictionary<short, NetworkMessageDelegate> m_MessageHandlers = new Dictionary<short, NetworkMessageDelegate>();
protected NetworkConnection m_Connection;
byte[] m_MsgBuffer;
protected enum ConnectState
{
None,
Resolving,
Resolved,
Connecting,
Connected,
Disconnected,
Failed
}
protected ConnectState m_AsyncConnect = ConnectState.None;
string m_RequestedServerHost = "";
protected ConnectState connectState = ConnectState.None;
internal void SetHandlers(NetworkConnection conn)
{
@ -56,7 +48,6 @@ internal void SetHandlers(NetworkConnection conn)
public int serverPort { get { return m_ServerPort; } }
public NetworkConnection connection { get { return m_Connection; } }
internal int hostId { get { return m_ClientId; } }
public Dictionary<short, NetworkMessageDelegate> handlers { get { return m_MessageHandlers; } }
public int numChannels { get { return m_HostTopology.DefaultConfig.ChannelCount; } }
public HostTopology hostTopology { get { return m_HostTopology; }}
@ -75,7 +66,7 @@ public int hostPort
}
}
public bool isConnected { get { return m_AsyncConnect == ConnectState.Connected; }}
public bool isConnected { get { return connectState == ConnectState.Connected; } }
public Type networkConnectionClass { get { return m_NetworkConnectionClass; } }
@ -87,19 +78,17 @@ public void SetNetworkConnectionClass<T>() where T : NetworkConnection
public NetworkClient()
{
if (LogFilter.logDev) { Debug.Log("Client created version " + Version.Current); }
m_MsgBuffer = new byte[NetworkMessage.MaxMessageSize];
AddClient(this);
}
public NetworkClient(NetworkConnection conn)
{
if (LogFilter.logDev) { Debug.Log("Client created version " + Version.Current); }
m_MsgBuffer = new byte[NetworkMessage.MaxMessageSize];
AddClient(this);
SetActive(true);
m_Connection = conn;
m_AsyncConnect = ConnectState.Connected;
connectState = ConnectState.Connected;
conn.SetHandlers(m_MessageHandlers);
RegisterSystemHandlers(false);
}
@ -127,106 +116,24 @@ static bool IsValidIpV6(string address)
public void Connect(string serverIp, int serverPort)
{
PrepareForConnect();
PrepareForConnect(false);
if (LogFilter.logDebug) { Debug.Log("Client Connect: " + serverIp + ":" + serverPort); }
string hostnameOrIp = serverIp;
m_ServerPort = serverPort;
if (Application.platform == RuntimePlatform.WebGLPlayer)
{
m_ServerIp = hostnameOrIp;
m_AsyncConnect = ConnectState.Resolved;
}
else if (serverIp.Equals("127.0.0.1") || serverIp.Equals("localhost"))
{
m_ServerIp = "127.0.0.1";
m_AsyncConnect = ConnectState.Resolved;
}
else if (serverIp.IndexOf(":") != -1 && IsValidIpV6(serverIp))
{
m_ServerIp = serverIp;
m_AsyncConnect = ConnectState.Resolved;
}
else
{
if (LogFilter.logDebug) { Debug.Log("Async DNS START:" + hostnameOrIp); }
m_RequestedServerHost = hostnameOrIp;
m_AsyncConnect = ConnectState.Resolving;
Dns.BeginGetHostAddresses(hostnameOrIp, GetHostAddressesCallback, this);
}
}
public void Connect(EndPoint secureTunnelEndPoint)
{
bool usePlatformSpecificProtocols = NetworkTransport.DoesEndPointUsePlatformProtocols(secureTunnelEndPoint);
PrepareForConnect(usePlatformSpecificProtocols);
if (LogFilter.logDebug) { Debug.Log("Client Connect to remoteSockAddr"); }
if (secureTunnelEndPoint == null)
{
if (LogFilter.logError) { Debug.LogError("Connect failed: null endpoint passed in"); }
m_AsyncConnect = ConnectState.Failed;
return;
}
// Make sure it's either IPv4 or IPv6
if (secureTunnelEndPoint.AddressFamily != AddressFamily.InterNetwork && secureTunnelEndPoint.AddressFamily != AddressFamily.InterNetworkV6)
{
if (LogFilter.logError) { Debug.LogError("Connect failed: Endpoint AddressFamily must be either InterNetwork or InterNetworkV6"); }
m_AsyncConnect = ConnectState.Failed;
return;
}
// Make sure it's an Endpoint we know what to do with
string endPointType = secureTunnelEndPoint.GetType().FullName;
if (endPointType == "System.Net.IPEndPoint")
{
IPEndPoint tmp = (IPEndPoint)secureTunnelEndPoint;
Connect(tmp.Address.ToString(), tmp.Port);
return;
}
if ((endPointType != "UnityEngine.XboxOne.XboxOneEndPoint") && (endPointType != "UnityEngine.PS4.SceEndPoint") && (endPointType != "UnityEngine.PSVita.SceEndPoint"))
{
if (LogFilter.logError) { Debug.LogError("Connect failed: invalid Endpoint (not IPEndPoint or XboxOneEndPoint or SceEndPoint)"); }
m_AsyncConnect = ConnectState.Failed;
return;
}
byte error = 0;
// regular non-relay connect
m_RemoteEndPoint = secureTunnelEndPoint;
m_AsyncConnect = ConnectState.Connecting;
try
{
m_ClientConnectionId = NetworkTransport.ConnectEndPoint(m_ClientId, m_RemoteEndPoint, 0, out error);
}
catch (Exception ex)
{
if (LogFilter.logError) { Debug.LogError("Connect failed: Exception when trying to connect to EndPoint: " + ex); }
m_AsyncConnect = ConnectState.Failed;
return;
}
if (m_ClientConnectionId == 0)
{
if (LogFilter.logError) { Debug.LogError("Connect failed: Unable to connect to EndPoint (" + error + ")"); }
m_AsyncConnect = ConnectState.Failed;
return;
}
connectState = ConnectState.Connecting;
Transport.client.Connect(serverIp, serverPort);
// setup all the handlers
m_ClientConnectionId = 0;
m_Connection = (NetworkConnection)Activator.CreateInstance(m_NetworkConnectionClass);
m_Connection.SetHandlers(m_MessageHandlers);
m_Connection.Initialize(m_ServerIp, m_ClientId, m_ClientConnectionId, m_HostTopology);
}
void PrepareForConnect()
{
PrepareForConnect(false);
}
void PrepareForConnect(bool usePlatformSpecificProtocols)
{
SetActive(true);
@ -241,69 +148,27 @@ void PrepareForConnect(bool usePlatformSpecificProtocols)
m_HostTopology = new HostTopology(config, 8);
}
m_ClientId = NetworkTransport.AddHost(m_HostTopology, m_HostPort);
}
// this called in another thread! Cannot call Update() here.
internal static void GetHostAddressesCallback(IAsyncResult ar)
{
try
{
IPAddress[] ip = Dns.EndGetHostAddresses(ar);
NetworkClient client = (NetworkClient)ar.AsyncState;
if (ip.Length == 0)
{
if (LogFilter.logError) { Debug.LogError("DNS lookup failed for:" + client.m_RequestedServerHost); }
client.m_AsyncConnect = ConnectState.Failed;
return;
}
client.m_ServerIp = ip[0].ToString();
client.m_AsyncConnect = ConnectState.Resolved;
if (LogFilter.logDebug) { Debug.Log("Async DNS Result:" + client.m_ServerIp + " for " + client.m_RequestedServerHost + ": " + client.m_ServerIp); }
}
catch (SocketException e)
{
NetworkClient client = (NetworkClient)ar.AsyncState;
if (LogFilter.logError) { Debug.LogError("DNS resolution failed: " + e.GetErrorCode()); }
if (LogFilter.logDebug) { Debug.Log("Exception:" + e); }
client.m_AsyncConnect = ConnectState.Failed;
}
}
internal void ContinueConnect()
{
byte error;
// regular non-relay connect
m_ClientConnectionId = NetworkTransport.Connect(m_ClientId, m_ServerIp, m_ServerPort, 0, out error);
m_Connection = (NetworkConnection)Activator.CreateInstance(m_NetworkConnectionClass);
m_Connection.SetHandlers(m_MessageHandlers);
m_Connection.Initialize(m_ServerIp, m_ClientId, m_ClientConnectionId, m_HostTopology);
m_ClientId = 0; // NetworkTransport.AddHost 'Returns the ID of the host that was created.'
}
public virtual void Disconnect()
{
m_AsyncConnect = ConnectState.Disconnected;
connectState = ConnectState.Disconnected;
ClientScene.HandleClientDisconnect(m_Connection);
if (m_Connection != null)
{
m_Connection.Disconnect();
m_Connection.Dispose();
m_Connection = null;
if (m_ClientId != -1)
{
NetworkTransport.RemoveHost(m_ClientId);
m_ClientId = -1;
}
}
}
public bool SendByChannel(short msgType, MessageBase msg, int channelId)
{
if (m_Connection != null)
{
if (m_AsyncConnect != ConnectState.Connected)
if (connectState != ConnectState.Connected)
{
if (LogFilter.logError) { Debug.LogError("NetworkClient SendByChannel when not connected to a server"); }
return false;
@ -319,11 +184,7 @@ public bool SendByChannel(short msgType, MessageBase msg, int channelId)
public void Shutdown()
{
if (LogFilter.logDebug) Debug.Log("Shutting down client " + m_ClientId);
if (m_ClientId != -1)
{
NetworkTransport.RemoveHost(m_ClientId);
m_ClientId = -1;
}
RemoveClient(this);
if (s_Clients.Count == 0)
{
@ -333,126 +194,53 @@ public void Shutdown()
internal virtual void Update()
{
//Debug.Log("NetworkClient.Update" + m_ClientId + " connectstate=" + connectState);
if (m_ClientId == -1)
{
return;
}
switch (m_AsyncConnect)
// don't do anything if we aren't fully connected
// -> we don't check Client.Connected because then we wouldn't
// process the last disconnect message.
if (connectState != ConnectState.Connecting && connectState != ConnectState.Connected)
{
case ConnectState.None:
case ConnectState.Resolving:
case ConnectState.Disconnected:
return;
case ConnectState.Failed:
GenerateConnectError((int)NetworkError.DNSFailure);
m_AsyncConnect = ConnectState.Disconnected;
return;
case ConnectState.Resolved:
m_AsyncConnect = ConnectState.Connecting;
ContinueConnect();
return;
case ConnectState.Connecting:
case ConnectState.Connected:
{
break;
}
}
int numEvents = 0;
NetworkEventType networkEvent;
do
{
int connectionId;
int channelId;
int receivedSize;
byte error;
networkEvent = NetworkTransport.ReceiveFromHost(m_ClientId, out connectionId, out channelId, m_MsgBuffer, (ushort)m_MsgBuffer.Length, out receivedSize, out error);
if (m_Connection != null) m_Connection.lastError = (NetworkError)error;
if (networkEvent != NetworkEventType.Nothing)
{
if (LogFilter.logDev) { Debug.Log("Client event: host=" + m_ClientId + " event=" + networkEvent + " error=" + error); }
}
switch (networkEvent)
{
case NetworkEventType.ConnectEvent:
if (LogFilter.logDebug) { Debug.Log("Client connected"); }
if (error != 0)
{
GenerateConnectError(error);
return;
}
m_AsyncConnect = ConnectState.Connected;
//Debug.Log("+++NetworkClient.Update calls NetworkTransport.ReceiveFromHost");
// any new message?
// -> calling it once per frame is okay, but really why not just
// process all messages and make it empty..
Telepathy.Message msg;
while (Transport.client.GetNextMessage(out msg))
{
switch (msg.eventType)
{
case Telepathy.EventType.Connected:
Debug.Log("NetworkClient loop: Connected");
m_Connection.InvokeHandlerNoData((short)MsgType.Connect);
connectState = ConnectState.Connected;
break;
case NetworkEventType.DataEvent:
if (error != 0)
{
GenerateDataError(error);
return;
}
#if UNITY_EDITOR
UnityEditor.NetworkDetailStats.IncrementStat(
UnityEditor.NetworkDetailStats.NetworkDirection.Incoming,
(short)MsgType.LLAPIMsg, "msg", 1);
#endif
// create a buffer with exactly 'receivedSize' size for the handlers so we don't need to read
// a size header (saves bandwidth)
byte[] data = new byte[receivedSize];
Array.Copy(m_MsgBuffer, data, receivedSize);
m_Connection.TransportReceive(data, channelId);
case Telepathy.EventType.Data:
Debug.Log("NetworkClient loop: Data: " + BitConverter.ToString(msg.data));
m_Connection.TransportReceive(msg.data, 0);
break;
case Telepathy.EventType.Disconnected:
Debug.Log("NetworkClient loop: Disconnected");
connectState = ConnectState.Disconnected;
case NetworkEventType.DisconnectEvent:
if (LogFilter.logDebug) { Debug.Log("Client disconnected"); }
m_AsyncConnect = ConnectState.Disconnected;
if (error != 0)
{
if ((NetworkError)error != NetworkError.Timeout)
{
GenerateDisconnectError(error);
}
}
//GenerateDisconnectError(error); TODO which one?
ClientScene.HandleClientDisconnect(m_Connection);
if (m_Connection != null)
{
m_Connection.InvokeHandlerNoData((short)MsgType.Disconnect);
}
break;
case NetworkEventType.Nothing:
break;
default:
if (LogFilter.logError) { Debug.LogError("Unknown network message type received: " + networkEvent); }
break;
}
if (++numEvents >= k_MaxEventsPerFrame)
{
if (LogFilter.logDebug) { Debug.Log("MaxEventsPerFrame hit (" + k_MaxEventsPerFrame + ")"); }
break;
}
if (m_ClientId == -1)
{
break;
}
}
while (networkEvent != NetworkEventType.Nothing);
}
void GenerateConnectError(byte error)
@ -501,8 +289,10 @@ public int GetRTT()
if (m_ClientId == -1)
return 0;
byte err;
return NetworkTransport.GetCurrentRTT(m_ClientId, m_ClientConnectionId, out err);
// TODO
//return NetworkTransport.GetCurrentRTT(m_ClientId, m_ClientConnectionId, out err);
Debug.Log("NetworkClient.GetRTT calls NetworkTransport.GetCurrentRTT");
return 0;
}
internal void RegisterSystemHandlers(bool localClient)
@ -559,14 +349,6 @@ static public void ShutdownAll()
internal static void SetActive(bool state)
{
// what is this check?
//if (state == false && s_Clients.Count != 0)
// return;
if (!s_IsActive && state)
{
NetworkTransport.Init();
}
s_IsActive = state;
}
};

View File

@ -85,8 +85,9 @@ public void Disconnect()
{
return;
}
byte error;
NetworkTransport.Disconnect(hostId, connectionId, out error);
Debug.Log("NetworkConnection.Disconnect calls NetworkTransport.Disconnect");
if (Transport.client.Connected) Transport.client.Disconnect();
RemoveObservers();
}
@ -314,21 +315,19 @@ public virtual void TransportReceive(byte[] bytes, int channelId)
public virtual bool TransportSend(byte[] bytes, int channelId, out byte error)
{
// try sending
if (NetworkTransport.Send(hostId, connectionId, channelId, bytes, bytes.Length, out error))
error = 0;
if (Transport.client.Connected)
{
Transport.client.Send(bytes);
return true;
}
else
else if (Transport.server.Active)
{
// log error, but ignore disconnect errors. they are expected, people quit sometimes.
if ((NetworkError)error != NetworkError.WrongConnection && (NetworkError)error != NetworkError.Timeout)
{
if (LogFilter.logError) { Debug.LogError("SendToTransport failed. error:" + (NetworkError)error + " channel:" + channelId + " bytesToSend:" + bytes.Length); }
Transport.server.Send((uint)connectionId, bytes);
return true;
}
return false;
}
}
internal void AddOwnedObject(NetworkIdentity obj)
{

View File

@ -83,8 +83,6 @@ public class NetworkManager : MonoBehaviour
public int maxConnections { get { return m_MaxConnections; } set { m_MaxConnections = value; } }
public List<QosType> channels { get { return m_Channels; } }
public EndPoint secureTunnelEndpoint { get { return m_EndPoint; } set { m_EndPoint = value; } }
public bool useWebSockets { get { return m_UseWebSockets; } set { m_UseWebSockets = value; } }
public bool clientLoadedScene { get { return m_ClientLoadedScene; } set { m_ClientLoadedScene = value; } }
@ -184,6 +182,17 @@ void InitializeSingleton()
}
}
// NetworkIdentity.UNetStaticUpdate is called from UnityEngine while LLAPI network is active.
// if we want TCP then we need to call it manually. probably best from NetworkManager, although this means
// that we can't use NetworkServer/NetworkClient without a NetworkManager invoking Update anymore.
void LateUpdate()
{
// call it while the NetworkManager exists.
// -> we don't only call while Client/Server.Connected, because then we would stop if disconnected and the
// NetworkClient wouldn't receive the last Disconnect event, result in all kinds of issues
NetworkIdentity.UNetStaticUpdate();
}
void OnValidate()
{
m_MaxConnections = Mathf.Clamp(m_MaxConnections, 1, 32000); // [1, 32000]
@ -252,7 +261,7 @@ bool StartServer(ConnectionConfig config, int maxConnections)
if (m_GlobalConfig != null)
{
NetworkTransport.Init(m_GlobalConfig);
//NetworkTransport.Init(m_GlobalConfig);
}
// passing a config overrides setting the connectionConfig property
@ -368,7 +377,7 @@ public NetworkClient StartClient(ConnectionConfig config, int hostPort)
if (m_GlobalConfig != null)
{
NetworkTransport.Init(m_GlobalConfig);
//NetworkTransport.Init(m_GlobalConfig);
}
client = new NetworkClient();
@ -397,13 +406,7 @@ public NetworkClient StartClient(ConnectionConfig config, int hostPort)
}
RegisterClientMessages(client);
if (m_EndPoint != null)
{
if (LogFilter.logDebug) { Debug.Log("NetworkManager StartClient using provided SecureTunnel"); }
client.Connect(m_EndPoint);
}
else
{
if (string.IsNullOrEmpty(m_NetworkAddress))
{
if (LogFilter.logError) { Debug.LogError("Must set the Network Address field in the manager"); }
@ -412,7 +415,6 @@ public NetworkClient StartClient(ConnectionConfig config, int hostPort)
if (LogFilter.logDebug) { Debug.Log("NetworkManager StartClient address:" + m_NetworkAddress + " port:" + m_NetworkPort); }
client.Connect(m_NetworkAddress, m_NetworkPort);
}
OnStartClient(client);
s_Address = m_NetworkAddress;
@ -629,7 +631,8 @@ internal static void UpdateScene()
singleton.StopHost();
NetworkTransport.Shutdown();
Transport.client.Disconnect();
Transport.server.Stop();
}
#endif

View File

@ -23,7 +23,6 @@ public sealed class NetworkServer
static int s_ServerHostId = -1;
static int s_ServerPort = -1;
static HostTopology s_HostTopology;
static byte[] s_MsgBuffer = new byte[NetworkMessage.MaxMessageSize];
static bool s_UseWebSockets;
static bool s_Initialized = false;
@ -71,8 +70,7 @@ static public bool Configure(HostTopology topology)
public static void Reset()
{
NetworkTransport.Shutdown();
NetworkTransport.Init();
Debug.Log("NetworkServer.Reset calls NetworkTransport.Shutdown and Init");
s_Active = false;
}
@ -88,7 +86,7 @@ public static void Shutdown()
}
else
{
NetworkTransport.RemoveHost(s_ServerHostId);
Transport.server.Stop();
s_ServerHostId = -1;
}
@ -104,11 +102,8 @@ public static void Initialize()
return;
s_Initialized = true;
NetworkTransport.Init();
if (LogFilter.logDev) { Debug.Log("NetworkServer Created version " + Version.Current); }
s_MsgBuffer = new byte[NetworkMessage.MaxMessageSize];
if (s_HostTopology == null)
{
var config = new ConnectionConfig();
@ -157,11 +152,15 @@ static internal bool InternalListen(string ipAddress, int serverPort)
if (s_UseWebSockets)
{
s_ServerHostId = NetworkTransport.AddWebsocketHost(s_HostTopology, serverPort, ipAddress);
// TODO
Debug.LogWarning("TODO Transport.StartWebGL?");
//s_ServerHostId = NetworkTransport.AddWebsocketHost(s_HostTopology, serverPort, ipAddress);
}
else
{
s_ServerHostId = NetworkTransport.AddHost(s_HostTopology, serverPort, ipAddress);
Debug.Log("NetworkServer.InternalListen calls NetworkTransport.AddHost port=" + serverPort);
Transport.server.Start(serverPort);
s_ServerHostId = 0; // so it doesn't return false
}
if (s_ServerHostId == -1)
@ -401,55 +400,28 @@ static internal void InternalUpdate()
if (s_ServerHostId == -1)
return;
int connectionId;
int channelId;
int receivedSize;
byte error;
//Debug.Log("NetworkServer.InternalUpdate calls NetworkTransport.ReceiveFromHost");
var networkEvent = NetworkEventType.DataEvent;
Telepathy.Message message;
while (Transport.server.GetNextMessage(out message))
{
//Debug.Log("NetworkServer.InternalUpdate new message: " + eventType + " " + (data != null ? BitConverter.ToString(data) : ""));
do
switch (message.eventType)
{
networkEvent = NetworkTransport.ReceiveFromHost(s_ServerHostId, out connectionId, out channelId, s_MsgBuffer, (ushort)s_MsgBuffer.Length, out receivedSize, out error);
if (networkEvent != NetworkEventType.Nothing)
{
if (LogFilter.logDev) { Debug.Log("Server event: host=" + s_ServerHostId + " event=" + networkEvent + " error=" + error); }
}
switch (networkEvent)
{
case NetworkEventType.ConnectEvent:
{
HandleConnect(connectionId, error);
case Telepathy.EventType.Connected:
HandleConnect((int)message.connectionId, 0);
break;
}
case NetworkEventType.DataEvent:
{
// create a buffer with exactly 'receivedSize' size for the handlers so we don't need to read
// a size header (saves bandwidth)
byte[] data = new byte[receivedSize];
Array.Copy(s_MsgBuffer, data, receivedSize);
HandleData(connectionId, data, channelId, error);
case Telepathy.EventType.Data:
//Debug.Log(message.connectionId + " Data: " + BitConverter.ToString(message.data));
HandleData((int)message.connectionId, message.data, 0, 0);
break;
}
case NetworkEventType.DisconnectEvent:
{
HandleDisconnect(connectionId, error);
break;
}
case NetworkEventType.Nothing:
{
break;
}
default:
{
if (LogFilter.logError) { Debug.LogError("Unknown network message type received: " + networkEvent); }
case Telepathy.EventType.Disconnected:
Console.WriteLine(message.connectionId + " Disconnected");
HandleDisconnect((int)message.connectionId, 0);
break;
}
}
}
while (networkEvent != NetworkEventType.Nothing);
UpdateServerObjects();
}
@ -464,18 +436,11 @@ static void HandleConnect(int connectionId, byte error)
return;
}
string address;
int port;
NetworkID networkId;
NodeID node;
byte error2;
NetworkTransport.GetConnectionInfo(s_ServerHostId, connectionId, out address, out port, out networkId, out node, out error2);
// add player info
NetworkConnection conn = (NetworkConnection)Activator.CreateInstance(s_NetworkConnectionClass);
conn.SetHandlers(s_MessageHandlers);
conn.Initialize(address, s_ServerHostId, connectionId, s_HostTopology);
conn.lastError = (NetworkError)error2;
conn.Initialize("TODO_ADDRESS_FROM_TCP", s_ServerHostId, connectionId, s_HostTopology);
conn.lastError = (NetworkError)0;
// add connection at correct index
while (s_Connections.Count <= connectionId)

View File

@ -0,0 +1,99 @@
using System;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class Client : Common
{
TcpClient client = new TcpClient();
Thread thread;
public bool Connecting
{
get { return thread != null && thread.IsAlive && !client.Connected; }
}
public bool Connected
{
get { return thread != null && thread.IsAlive && client.Connected; }
}
// the thread function
void ThreadFunction(string ip, int port)
{
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// connect (blocking)
// (NoDelay disables nagle algorithm. lowers CPU% and latency)
client.NoDelay = true;
client.Connect(ip, port);
// run the receive loop
ReceiveLoop(0, client);
}
catch (SocketException exception)
{
// this happens if (for example) the ip address is correct
// but there is no server running on that ip/port
Logger.Log("Client: failed to connect to ip=" + ip + " port=" + port + " reason=" + exception);
// clean up properly before exiting
client.Close();
}
catch (Exception exception)
{
// something went wrong. probably important.
Logger.LogError("Client Exception: " + exception);
}
}
public void Connect(string ip, int port)
{
// not if already started
if (Connecting || Connected) return;
// clear old messages in queue, just to be sure that the caller
// doesn't receive data from last time and gets out of sync.
// -> calling this in Disconnect isn't smart because the caller may
// still want to process all the latest messages afterwards
messageQueue.Clear();
// client.Connect(ip, port) is blocking. let's call it in the thread
// and return immediately.
// -> this way the application doesn't hang for 30s if connect takes
// too long, which is especially good in games
// -> this way we don't async client.BeginConnect, which seems to
// fail sometimes if we connect too many clients too fast
thread = new Thread(() => { ThreadFunction(ip, port); });
thread.IsBackground = true;
thread.Start();
}
public void Disconnect()
{
// only if started
if (!Connecting && !Connected) return;
Logger.Log("Client: disconnecting");
// this is supposed to disconnect gracefully, but the blocking Read
// calls throw a 'Read failure' exception instead of returning 0.
// (maybe it's Unity? maybe Mono?)
client.GetStream().Close();
client.Close();
}
public bool Send(byte[] data)
{
if (Connected)
{
return SendMessage(client.GetStream(), data);
}
Logger.LogWarning("Client.Send: not connected!");
return false;
}
}
}

View File

@ -0,0 +1,187 @@
// common code used by server and client
using System;
using System.Net.Sockets;
namespace Telepathy
{
public abstract class Common
{
// common code /////////////////////////////////////////////////////////
// connectionId counter
// (right now we only use it from one listener thread, but we might have
// multiple threads later in case of WebSockets etc.)
// -> static so that another server instance doesn't start at 0 again.
protected static SafeCounter counter = new SafeCounter();
// incoming message queue of <connectionId, message>
// (not a HashSet because one connection can have multiple new messages)
protected SafeQueue<Message> messageQueue = new SafeQueue<Message>();
// warning if message queue gets too big
// if the average message is about 20 bytes then:
// - 1k messages are 20KB
// - 10k messages are 200KB
// - 100k messages are 1.95MB
// 2MB are not that much, but it is a bad sign if the caller process
// can't call GetNextMessage faster than the incoming messages.
public int messageQueueSizeWarning = 100000;
DateTime messageQueueLastWarning = DateTime.Now;
// removes and returns the oldest message from the message queue.
// (might want to call this until it doesn't return anything anymore)
// -> Connected, Data, Disconnected events are all added here
// -> bool return makes while (GetMessage(out Message)) easier!
public bool GetNextMessage(out Message message)
{
return messageQueue.TryDequeue(out message);
}
// static helper functions /////////////////////////////////////////////
// fast ushort to byte[] conversion and vice versa
// -> test with 100k conversions:
// BitConverter.GetBytes(ushort): 144ms
// bit shifting: 11ms
// -> 10x speed improvement makes this optimization actually worth it
// -> this way we don't need to allocate BinaryWriter/Reader either
static byte[] UShortToBytes(ushort value)
{
return new byte[]
{
(byte)value,
(byte)(value >> 8)
};
}
static ushort BytesToUShort(byte[] bytes)
{
return (ushort)((bytes[1] << 8) + bytes[0]);
}
// send message (via stream) with the <size,content> message structure
protected static bool SendMessage(NetworkStream stream, byte[] content)
{
// can we still write to this socket (not disconnected?)
if (!stream.CanWrite)
{
Logger.LogWarning("Send: stream not writeable: " + stream);
return false;
}
// check size
if (content.Length > ushort.MaxValue)
{
Logger.LogError("Send: message too big(" + content.Length + ") max=" + ushort.MaxValue);
return false;
}
// stream.Write throws exceptions if client sends with high frequency
// and the server stops
try
{
// write size header and content
byte[] header = UShortToBytes((ushort)content.Length);
stream.Write(header, 0, header.Length);
stream.Write(content, 0, content.Length);
stream.Flush();
return true;
}
catch (Exception exception)
{
// log as regular message because servers do shut down sometimes
Logger.Log("Send: stream.Write exception: " + exception);
return false;
}
}
// read message (via stream) with the <size,content> message structure
protected static bool ReadMessageBlocking(NetworkStream stream, out byte[] content)
{
content = null;
// read exactly 2 bytes for header (blocking)
byte[] header = new byte[2];
if (!stream.ReadExactly(header, 2))
return false;
ushort size = BytesToUShort(header);
// read exactly 'size' bytes for content (blocking)
content = new byte[size];
if (!stream.ReadExactly(content, size))
return false;
return true;
}
// thread receive function is the same for client and server's clients
protected void ReceiveLoop(uint connectionId, TcpClient client)
{
// get NetworkStream from client
NetworkStream stream = client.GetStream();
// absolutely must wrap with try/catch, otherwise thread exceptions
// are silent
try
{
// add connected event to queue
messageQueue.Enqueue(new Message(connectionId, EventType.Connected, null));
// let's talk about reading data.
// -> normally we would read as much as possible and then
// extract as many <size,content>,<size,content> messages
// as we received this time. this is really complicated
// and expensive to do though
// -> instead we use a trick:
// Read(2) -> size
// Read(size) -> content
// repeat
// Read is blocking, but it doesn't matter since the
// best thing to do until the full message arrives,
// is to wait.
// => this is the most elegant AND fast solution.
// + no resizing
// + no extra allocations, just one for the content
// + no crazy extraction logic
while (true)
{
// read the next message (blocking) or stop if stream closed
byte[] content;
if (!ReadMessageBlocking(stream, out content))
break;
// queue it
messageQueue.Enqueue(new Message(connectionId, EventType.Data, content));
// and show a warning if the queue gets too big
// -> we don't want to show a warning every single time,
// because then a lot of processing power gets wasted on
// logging, which will make the queue pile up even more.
// -> instead we show it every 10s, so that the system can
// use most it's processing power to hopefully process it.
if (messageQueue.Count > messageQueueSizeWarning)
{
TimeSpan elapsed = DateTime.Now - messageQueueLastWarning;
if (elapsed.TotalSeconds > 10)
{
Logger.LogWarning("ReceiveLoop: messageQueue is getting big(" + messageQueue.Count + "), try calling GetNextMessage more often. You can call it more than once per frame!");
messageQueueLastWarning = DateTime.Now;
}
}
}
}
catch (Exception exception)
{
// something went wrong. the thread was interrupted or the
// connection closed or we closed our own connection or ...
// -> either way we should stop gracefully
Logger.Log("ReceiveLoop: finished receive function for connectionId=" + connectionId + " reason: " + exception);
}
// if we got here then either the client while loop ended, or an
// exception happened. disconnect
messageQueue.Enqueue(new Message(connectionId, EventType.Disconnected, null));
// clean up no matter what
stream.Close();
client.Close();
}
}
}

View File

@ -0,0 +1,9 @@
namespace Telepathy
{
public enum EventType
{
Connected,
Data,
Disconnected
}
}

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2018, vis2k
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,32 @@
// A simple logger class that uses Console.WriteLine by default.
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
// (this way we don't have to depend on UnityEngine.DLL and don't need a
// different version for every UnityEngine version here)
using System;
namespace Telepathy
{
public static class Logger
{
// log regular
public static Action<string> LogMethod = Console.WriteLine;
public static void Log(string msg)
{
LogMethod(msg);
}
// log warning
public static Action<string> LogWarningMethod = Console.WriteLine;
public static void LogWarning(string msg)
{
LogWarningMethod(msg);
}
// log error
public static Action<string> LogErrorMethod = Console.Error.WriteLine;
public static void LogError(string msg)
{
LogErrorMethod(msg);
}
}
}

View File

@ -0,0 +1,17 @@
// incoming message queue of <connectionId, message>
// (not a HashSet because one connection can have multiple new messages)
namespace Telepathy
{
public struct Message
{
public uint connectionId;
public EventType eventType;
public byte[] data;
public Message(uint connectionId, EventType eventType, byte[] data)
{
this.connectionId = connectionId;
this.eventType = eventType;
this.data = data;
}
}
}

View File

@ -0,0 +1,55 @@
using System.IO;
using System.Net.Sockets;
public static class NetworkStreamExtensions
{
// .Read returns '0' if remote closed the connection but throws an
// IOException if we voluntarily closed our own connection.
//
// lets's add a ReadSafely method that returns '0' in both cases so we don't
// have to worry about exceptions, since a disconnect is a disconnect...
public static int ReadSafely(this NetworkStream stream, byte[] buffer, int offset, int size)
{
try
{
return stream.Read(buffer, offset, size);
}
catch (IOException)
{
return 0;
}
}
// helper function to read EXACTLY 'n' bytes
// -> default .Read reads up to 'n' bytes. this function reads exactly 'n'
// bytes
// -> this is blocking until 'n' bytes were received
// -> immediately returns false in case of disconnects
public static bool ReadExactly(this NetworkStream stream, byte[] buffer, int amount)
{
// there might not be enough bytes in the TCP buffer for .Read to read
// the whole amount at once, so we need to keep trying until we have all
// the bytes (blocking)
//
// note: this just is a faster version of reading one after another:
// for (int i = 0; i < amount; ++i)
// if (stream.Read(buffer, i, 1) == 0)
// return false;
// return true;
int bytesRead = 0;
while (bytesRead < amount)
{
// read up to 'remaining' bytes with the 'safe' read extension
int remaining = amount - bytesRead;
int result = stream.ReadSafely(buffer, bytesRead, remaining);
// .Read returns 0 if disconnected
if (result == 0)
return false;
// otherwise add to bytes read
bytesRead += result;
}
return true;
}
}

View File

@ -0,0 +1,297 @@
![Telepathy Logo](https://i.imgur.com/eUk2rmT.png)
[![Build status](https://img.shields.io/appveyor/ci/vis2k73562/telepathy.svg)](https://ci.appveyor.com/project/vis2k73562/telepathy/)
[![AppVeyor tests branch](https://img.shields.io/appveyor/tests/vis2k73562/telepathy.svg)](https://ci.appveyor.com/project/vis2k73562/telepathy/branch/master/tests)
[![Discord](https://img.shields.io/discord/343440455738064897.svg)](https://discordapp.com/invite/N9QVxbM)
[![Codecov](https://codecov.io/gh/vis2k/telepathy/graph/badge.svg)](https://codecov.io/gh/vis2k/telepathy)
Simple, message based, MMO Scale TCP networking in C#. And no magic.
Telepathy was designed with the [KISS Principle](https://en.wikipedia.org/wiki/KISS_principle) in mind.<br/>
Telepathy is fast and extremely reliable, designed for [MMO](https://www.assetstore.unity3d.com/#!/content/51212) scale Networking.<br/>
Telepathy uses framing, so anything sent will be received the same way.<br/>
Telepathy is raw C# and can be used in Unity3D too.<br/>
# What makes Telepathy special?
Telepathy was originally designed for [uMMORPG](https://www.assetstore.unity3d.com/#!/content/51212) after 3 years in UDP hell.
We needed a library that is:
* Stable & Bug free: Telepathy uses only 400 lines of code. There is no magic.
* High performance: Telepathy can handle thousands of connections and packages.
* Concurrent: Telepathy uses one thread per connection. It can make heavy use of multi core processors.
* Simple: Telepathy takes care of everything. All you need to do is call Connect/GetNextMessage/Disconnect.
* Message based: if we send 10 and then 2 bytes, then the other end receives 10 and then 2 bytes, never 12 at once.
MMORPGs are insanely difficult to make and we created Telepathy so that we would never have to worry about low level Networking again.
# What about...
* Async Sockets: didn't perform better in our benchmarks.
* ConcurrentQueue: .NET 3.5 compatibility is important for Unity. Wasn't faster than our SafeQueue anyway.
* UDP vs. TCP: Minecraft and World of Warcraft are two of the biggest multiplayer games of all time and they both use TCP networking. There is a reason for that.
# Using the Telepathy Server
```C#
// create and start the server
Telepathy.Server server = new Telepathy.Server();
server.Start(1337);
// grab all new messages. do this in your Update loop.
Telepathy.Message msg;
while (server.GetNextMessage(out msg))
{
switch (msg.eventType)
{
case Telepathy.EventType.Connect:
Console.WriteLine(msg.connectionId + " Connected");
break;
case Telepathy.EventType.Data:
Console.WriteLine(msg.connectionId + " Data: " + BitConverter.ToString(msg.data));
break;
case Telepathy.EventType.Disconnect:
Console.WriteLine(msg.connectionId + " Disconnected");
break;
}
}
// send a message to client with connectionId = 0 (first one)
server.Send(0, new byte[]{0x42, 0x1337});
// stop the server when you don't need it anymore
server.Stop();
```
# Using the Telepathy Client
```C#
// create and connect the client
Telepathy.Client Client = new Telepathy.Client();
client.Connect("localhost", 1337);
// grab all new messages. do this in your Update loop.
Telepathy.Message msg;
while (client.GetNextMessage(out msg))
{
switch (msg.eventType)
{
case Telepathy.EventType.Connect:
Console.WriteLine("Connected");
break;
case Telepathy.EventType.Data:
Console.WriteLine("Data: " + BitConverter.ToString(msg.data));
break;
case Telepathy.EventType.Disconnect:
Console.WriteLine("Disconnected");
break;
}
}
// send a message to server
client.Send(new byte[]{0xFF});
// disconnect from the server when we are done
client.Disconnect();
```
# Unity Integration
Here is a very simple MonoBehaviour script for Unity. It's really just the above code with logging configured for Unity's Debug.Log:
```C#
using System;
using UnityEngine;
public class SimpleExample : MonoBehaviour
{
Telepathy.Client client = new Telepathy.Client();
Telepathy.Server server = new Telepathy.Server();
void Awake()
{
// update even if window isn't focused, otherwise we don't receive.
Application.runInBackground = true;
// use Debug.Log functions for Telepathy so we can see it in the console
Telepathy.Logger.LogMethod = Debug.Log;
Telepathy.Logger.LogWarningMethod = Debug.LogWarning;
Telepathy.Logger.LogErrorMethod = Debug.LogError;
}
void Update()
{
// client
if (client.Connected)
{
if (Input.GetKeyDown(KeyCode.Space))
client.Send(new byte[]{0x1});
// show all new messages
Telepathy.Message msg;
while (client.GetNextMessage(out msg))
{
switch (msg.eventType)
{
case Telepathy.EventType.Connected:
Console.WriteLine("Connected");
break;
case Telepathy.EventType.Data:
Console.WriteLine("Data: " + BitConverter.ToString(msg.data));
break;
case Telepathy.EventType.Disconnected:
Console.WriteLine("Disconnected");
break;
}
}
}
// server
if (server.Active)
{
if (Input.GetKeyDown(KeyCode.Space))
server.Send(0, new byte[]{0x2});
// show all new messages
Telepathy.Message msg;
while (server.GetNextMessage(out msg))
{
switch (msg.eventType)
{
case Telepathy.EventType.Connected:
Console.WriteLine(msg.connectionId + " Connected");
break;
case Telepathy.EventType.Data:
Console.WriteLine(msg.connectionId + " Data: " + BitConverter.ToString(msg.data));
break;
case Telepathy.EventType.Disconnected:
Console.WriteLine(msg.connectionId + " Disconnected");
break;
}
}
}
}
void OnGUI()
{
// client
GUI.enabled = !client.Connected;
if (GUI.Button(new Rect(0, 0, 120, 20), "Connect Client"))
client.Connect("localhost", 1337);
GUI.enabled = client.Connected;
if (GUI.Button(new Rect(130, 0, 120, 20), "Disconnect Client"))
client.Disconnect();
// server
GUI.enabled = !server.Active;
if (GUI.Button(new Rect(0, 25, 120, 20), "Start Server"))
server.Start(1337);
GUI.enabled = server.Active;
if (GUI.Button(new Rect(130, 25, 120, 20), "Stop Server"))
server.Stop();
GUI.enabled = true;
}
void OnApplicationQuit()
{
// the client/server threads won't receive the OnQuit info if we are
// running them in the Editor. they would only quit when we press Play
// again later. this is fine, but let's shut them down here for consistency
client.Disconnect();
server.Stop();
}
}
```
Make sure to enable 'run in Background' for your project settings, which is a must for all multiplayer games.
Then build it, start the server in the build and the client in the Editor and press Space to send a test message.
# Benchmarks
**Real World**<br/>
Telepathy is constantly tested in production with [uMMORPG](https://www.assetstore.unity3d.com/#!/content/51212).
We [recently tested](https://docs.google.com/document/d/e/2PACX-1vQqf_iqOLlBRTUqqyor_OUx_rHlYx-SYvZWMvHGuLIuRuxJ-qX3s8JzrrBB5vxDdGfl-HhYZW3g5lLW/pub#h.h4wha2mpetsc) 100+ players all broadcasting to each other in the worst case scenario, without issues.
We had to stop the test because we didn't have more players to spawn clients.<br/>
The next huge test will come soon...
**Connections Test**<br/>
We also test only the raw Telepathy library by spawing 1 server and 1000 clients, each client sending 100 bytes 14 times per second and the server echoing the same message back to each client. This test should be a decent example for an MMORPG scenario and allows us to test if the raw Telepathy library can handle it.
Test Computer: 2015 Macbook Pro with a 2,2 GHz Intel Core i7 processor.<br/>
Test Results:<br/>
| Clients | CPU Usage | Ram Usage | Bandwidth Client+Server | Result |
| ------- | ----------| --------- | ------------------------ | ------ |
| 128 | 7% | 26 MB | 1-2 MB/s | Passed |
| 500 | 28% | 51 MB | 3-4 MB/s | Passed |
| 1000 | 42% | 75 MB | 3-5 MB/s | Passed |
_Note: results will be significantly better on a really powerful server. Tests will follow._
The Connections Test can be reproduced with the following code:<br/>
```C#
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Telepathy;
public class Test
{
static void Main()
{
// start server
Server server = new Server();
server.Start(1337);
int serverFrequency = 60;
Thread serverThread = new Thread(() =>
{
Logger.Log("started server");
while (true)
{
// reply to each incoming message
Message msg;
while (server.GetNextMessage(out msg))
{
if (msg.eventType == EventType.Data)
server.Send(msg.connectionId, msg.data);
}
// sleep
Thread.Sleep(1000 / serverFrequency);
}
});
serverThread.IsBackground = false;
serverThread.Start();
// start n clients and get queue messages all in this thread
int clientAmount = 1000;
string message = "Sometimes we just need a good networking library";
byte[] messageBytes = Encoding.ASCII.GetBytes(message);
int clientFrequency = 14;
List<Client> clients = new List<Client>();
for (int i = 0; i < clientAmount; ++i)
{
Client client = new Client();
client.Connect("localhost", 1337);
clients.Add(client);
Thread.Sleep(15);
}
Logger.Log("started all clients");
while (true)
{
foreach (Client client in clients)
{
// send 2 messages each time
client.Send(messageBytes);
client.Send(messageBytes);
// get new messages from queue
Message msg;
while (client.GetNextMessage(out msg))
{
}
}
// client tick rate
Thread.Sleep(1000 / clientFrequency);
}
}
}
```

View File

@ -0,0 +1,30 @@
// a very simple locked 'uint' counter
// (we can't do lock(int) so we need an object and since we also need a max
// check, we might as well put it into a class here)
using System;
namespace Telepathy
{
public class SafeCounter
{
uint counter;
public uint Next()
{
lock (this)
{
// it's very unlikely that we reach the uint limit of 4 billion.
// even with 1 new connection per second, this would take 136 years.
// -> but if it happens, then we should throw an exception because
// the caller probably should stop accepting clients.
// -> it's hardly worth using 'bool Next(out id)' for that case
// because it's just so unlikely.
if (counter == uint.MaxValue)
{
throw new Exception("SafeCounter limit reached: " + counter);
}
return counter++;
}
}
}
}

View File

@ -0,0 +1,53 @@
// replaces ConcurrentDictionary which is not available in .NET 3.5 yet.
using System.Collections.Generic;
using System.Linq;
namespace Telepathy
{
public class SafeDictionary<TKey,TValue>
{
Dictionary<TKey,TValue> dict = new Dictionary<TKey,TValue>();
public void Add(TKey key, TValue value)
{
lock(dict)
{
dict[key] = value;
}
}
public void Remove(TKey key)
{
lock(dict)
{
dict.Remove(key);
}
}
// can't check .ContainsKey before Get because it might change inbetween,
// so we need a TryGetValue
public bool TryGetValue(TKey key, out TValue result)
{
lock(dict)
{
return dict.TryGetValue(key, out result);
}
}
public List<TValue> GetValues()
{
lock(dict)
{
return dict.Values.ToList();
}
}
public void Clear()
{
lock(dict)
{
dict.Clear();
}
}
}
}

View File

@ -0,0 +1,55 @@
// replaces ConcurrentQueue which is not available in .NET 3.5 yet.
using System.Collections.Generic;
namespace Telepathy
{
public class SafeQueue<T>
{
Queue<T> queue = new Queue<T>();
// for statistics. don't call Count and assume that it's the same after the
// call.
public int Count
{
get
{
lock(queue)
{
return queue.Count;
}
}
}
public void Enqueue(T item)
{
lock(queue)
{
queue.Enqueue(item);
}
}
// can't check .Count before doing Dequeue because it might change inbetween,
// so we need a TryDequeue
public bool TryDequeue(out T result)
{
lock(queue)
{
result = default(T);
if (queue.Count > 0)
{
result = queue.Dequeue();
return true;
}
return false;
}
}
public void Clear()
{
lock(queue)
{
queue.Clear();
}
}
}
}

View File

@ -0,0 +1,155 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class Server : Common
{
// listener
TcpListener listener;
Thread listenerThread;
// clients with <connectionId, TcpClient>
SafeDictionary<uint, TcpClient> clients = new SafeDictionary<uint, TcpClient>();
// check if the server is running
public bool Active
{
get { return listenerThread != null && listenerThread.IsAlive; }
}
// the listener thread's listen function
void Listen(int port)
{
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// start listener
// (NoDelay disables nagle algorithm. lowers CPU% and latency)
listener = new TcpListener(new IPEndPoint(IPAddress.Any, port));
listener.Server.NoDelay = true;
listener.Start();
Logger.Log("Server is listening");
// keep accepting new clients
while (true)
{
// wait and accept new client
// note: 'using' sucks here because it will try to
// dispose after thread was started but we still need it
// in the thread
TcpClient client = listener.AcceptTcpClient();
// generate the next connection id (thread safely)
uint connectionId = counter.Next();
// spawn a thread for each client to listen to his
// messages
Thread thread = new Thread(() =>
{
// add to dict immediately
clients.Add(connectionId, client);
// run the receive loop
ReceiveLoop(connectionId, client);
// remove client from clients dict afterwards
clients.Remove(connectionId);
});
thread.IsBackground = true;
thread.Start();
}
}
catch (ThreadAbortException exception)
{
// UnityEditor causes AbortException if thread is still
// running when we press Play again next time. that's okay.
Logger.Log("Server thread aborted. That's okay. " + exception);
}
catch (SocketException exception)
{
// calling StopServer will interrupt this thread with a
// 'SocketException: interrupted'. that's okay.
Logger.Log("Server Thread stopped. That's okay. " + exception);
}
catch (Exception exception)
{
// something went wrong. probably important.
Logger.LogError("Server Exception: " + exception);
}
}
// start listening for new connections in a background thread and spawn
// a new thread for each one.
public void Start(int port)
{
// not if already started
if (Active) return;
// clear old messages in queue, just to be sure that the caller
// doesn't receive data from last time and gets out of sync.
// -> calling this in Stop isn't smart because the caller may
// still want to process all the latest messages afterwards
messageQueue.Clear();
// start the listener thread
Logger.Log("Server: starting on port=" + port);
listenerThread = new Thread(() => { Listen(port); });
listenerThread.IsBackground = true;
listenerThread.Start();
}
public void Stop()
{
// only if started
if (!Active) return;
Logger.Log("Server: stopping...");
// stop listening to connections so that no one can connect while we
// close the client connections
listener.Stop();
// close all client connections
List<TcpClient> connections = clients.GetValues();
foreach (TcpClient client in connections)
{
// this is supposed to disconnect gracefully, but the blocking
// Read calls throw a 'Read failure' exception in Unity
// sometimes (instead of returning 0)
client.GetStream().Close();
client.Close();
}
// clear clients list
clients.Clear();
}
// send message to client using socket connection.
public bool Send(uint connectionId, byte[] data)
{
// find the connection
TcpClient client;
if (clients.TryGetValue(connectionId, out client))
{
// GetStream() might throw exception if client is disconnected
try
{
NetworkStream stream = client.GetStream();
return SendMessage(stream, data);
}
catch (Exception exception)
{
Logger.LogWarning("Server.Send exception: " + exception);
return false;
}
}
Logger.LogWarning("Server.Send: invalid connectionId: " + connectionId);
return false;
}
}
}

View File

@ -0,0 +1,18 @@
// the transport layer implementation
namespace UnityEngine.Networking
{
public static class Transport
{
public static Telepathy.Client client = new Telepathy.Client();
public static Telepathy.Server server = new Telepathy.Server();
static Transport()
{
// tell Telepathy to use Unity's Debug.Log
Telepathy.Logger.LogMethod = Debug.Log;
Telepathy.Logger.LogWarningMethod = Debug.LogWarning;
Telepathy.Logger.LogErrorMethod = Debug.LogError;
}
}
}

View File

@ -65,12 +65,27 @@
<Compile Include="NetworkManagerHUD.cs" />
<Compile Include="NetworkBehaviour.cs" />
<Compile Include="NetworkIdentity.cs" />
<Compile Include="Telepathy\Client.cs" />
<Compile Include="Telepathy\Common.cs" />
<Compile Include="Telepathy\EventType.cs" />
<Compile Include="Telepathy\Logger.cs" />
<Compile Include="Telepathy\Message.cs" />
<Compile Include="Telepathy\NetworkStreamExtensions.cs" />
<Compile Include="Telepathy\SafeCounter.cs" />
<Compile Include="Telepathy\SafeDictionary.cs" />
<Compile Include="Telepathy\SafeQueue.cs" />
<Compile Include="Telepathy\Server.cs" />
<Compile Include="Transport.cs" />
<Compile Include="UNetwork.cs" />
<Compile Include="NetworkReader.cs" />
<Compile Include="NetworkServer.cs" />
<Compile Include="SyncList.cs" />
<Compile Include="NetworkWriter.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="Telepathy\LICENSE" />
<Content Include="Telepathy\README.md" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Target Name="AfterBuild">
<MakeDir Directories="$(ProjectDir)..\Output\Standalone" />

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="..\packages\NUnit.3.10.1\build\NUnit.props" Condition="Exists('..\packages\NUnit.3.10.1\build\NUnit.props')" />
<PropertyGroup>