Skip to content

Commit

Permalink
Added new options for controlling ordering and encryption. Use orderi…
Browse files Browse the repository at this point in the history
…ng if precise package handling is required for a server.

Fixes to service killer and client.
  • Loading branch information
NiclasOlofsson committed Jul 13, 2016
1 parent db22d3a commit c923121
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 90 deletions.
80 changes: 58 additions & 22 deletions src/MiNET/MiNET.Client/MiNetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class MiNetClient

private IPEndPoint _clientEndpoint;
private IPEndPoint _serverEndpoint;
private readonly DedicatedThreadPool _threadPool;
private short _mtuSize = 1400;
private int _reliableMessageNumber = -1;
private Vector3 _spawn;
Expand All @@ -57,11 +58,12 @@ public class MiNetClient
public string Username { get; set; }
public int ClientId { get; set; }

public MiNetClient(IPEndPoint endpoint, string username)
public MiNetClient(IPEndPoint endpoint, string username, DedicatedThreadPool threadPool)
{
Username = username;
ClientId = new Random().Next();
_serverEndpoint = endpoint;
_threadPool = threadPool;
if(_serverEndpoint != null) Log.Warn("Connecting to: " + _serverEndpoint);
_clientEndpoint = new IPEndPoint(IPAddress.Any, 0);
}
Expand All @@ -70,7 +72,11 @@ private static void Main(string[] args)
{
Console.WriteLine("Starting client...");

var client = new MiNetClient(null, "TheGrey");
int threads;
int iothreads;
ThreadPool.GetMinThreads(out threads, out iothreads);

var client = new MiNetClient(null, "TheGrey", new DedicatedThreadPool(new DedicatedThreadPoolSettings(threads)));
//var client = new MiNetClient(new IPEndPoint(Dns.GetHostEntry("pe.mineplex.com").AddressList[0], 19132), "TheGrey");
//var client = new MiNetClient(new IPEndPoint(Dns.GetHostEntry("yodamine.net").AddressList[0], 19132), "TheGrey");
//var client = new MiNetClient(new IPEndPoint(IPAddress.Parse("192.168.0.3"), 19132), "TheGrey");
Expand Down Expand Up @@ -165,7 +171,7 @@ public void StartClient()
/// <returns></returns>
public bool StopClient()
{
Environment.Exit(0);
//Environment.Exit(0);
try
{
if (UdpClient == null) return true; // Already stopped. It's ok.
Expand Down Expand Up @@ -311,19 +317,19 @@ private void ProcessMessage(byte[] receiveBytes, IPEndPoint senderEndpoint)
throw new Exception("Receive ERROR, NAK in wrong place");
}

//if (IsEmulator && PlayerStatus == 3)
//{
// int datagramId = new Int24(new[] {receiveBytes[1], receiveBytes[2], receiveBytes[3]});
if (IsEmulator && PlayerStatus == 3)
{
int datagramId = new Int24(new[] { receiveBytes[1], receiveBytes[2], receiveBytes[3] });

// //Acks ack = Acks.CreateObject();
// Acks ack = new Acks();
// ack.acks.Add(datagramId);
// byte[] data = ack.Encode();
// ack.PutPool();
// SendData(data, senderEndpoint);
//Acks ack = Acks.CreateObject();
Acks ack = new Acks();
ack.acks.Add(datagramId);
byte[] data = ack.Encode();
ack.PutPool();
SendData(data, senderEndpoint);

// //return;
//}
return;
}

ConnectedPackage package = ConnectedPackage.CreateObject();
//var package = new ConnectedPackage();
Expand Down Expand Up @@ -391,9 +397,11 @@ private void HandleConnectedPackage(ConnectedPackage package)
private object _eventSync = new object();
private ConcurrentPriorityQueue<int, Package> _queue = new ConcurrentPriorityQueue<int, Package>();

private Thread _processingThread = null;

public void AddToProcessing(Package message)
{
if (message.Reliability != Reliability.ReliableOrdered)
if (Session.CryptoContext == null || Session.CryptoContext.UseEncryption == false || message.Reliability != Reliability.ReliableOrdered)
{
HandlePackage(message);
return;
Expand All @@ -408,11 +416,22 @@ public void AddToProcessing(Package message)
return;
}

if(_processingThread == null)
{
_processingThread = new Thread(ProcessQueueThread);
_processingThread.Start();
}

_queue.Enqueue(message.OrderingIndex, message);
WaitHandle.SignalAndWait(_waitEvent, _mainWaitEvent);
}
}

private void ProcessQueueThread(object o)
{
ProcessQueue();
}

private Task ProcessQueue()
{
while (true)
Expand Down Expand Up @@ -572,8 +591,6 @@ private void HandleSplitMessage(PlayerNetworkSession playerSession, SplitPartPac
public int PlayerStatus { get; set; }


public AutoResetEvent FirstPacketWaitHandle = new AutoResetEvent(false);

private void HandlePackage(Package message)
{
//Log.Warn($"Package 0x{message.Id:X2} {message.GetType().Name}");
Expand All @@ -582,8 +599,6 @@ private void HandlePackage(Package message)
{
OnWrapper((McpeWrapper) message);

FirstPacketWaitHandle.Set();

return;
}

Expand Down Expand Up @@ -852,7 +867,8 @@ public void SendLogin(string username)

Session.CryptoContext = new CryptoContext()
{
ClientKey = clientKey
ClientKey = clientKey,
UseEncryption = false,
};

SendPackage(batch);
Expand Down Expand Up @@ -941,12 +957,19 @@ private void InitiateEncryption(string serverKey, byte[] randomKeyToken)
}
}

public AutoResetEvent FirstEncryptedPacketWaitHandle = new AutoResetEvent(false);

public AutoResetEvent FirstPacketWaitHandle = new AutoResetEvent(false);

private void OnWrapper(McpeWrapper message)
{
FirstPacketWaitHandle.Set();

// Get bytes
byte[] payload = message.payload;
if (Session.CryptoContext != null && Session.CryptoContext.UseEncryption)
{
FirstEncryptedPacketWaitHandle.Set();
payload = CryptoUtils.Decrypt(payload, Session.CryptoContext);
}

Expand All @@ -957,7 +980,16 @@ private void OnWrapper(McpeWrapper message)

TraceReceive(newMessage);

Task.Run(() => { HandlePackage(newMessage); });
if(_processingThread == null)
{
HandlePackage(newMessage);
}
else
{
_threadPool.QueueUserWorkItem(() => HandlePackage(newMessage));
}

//Task.Run(() => { HandlePackage(newMessage); });
}

private void OnMcpeAdventureSettings(McpeAdventureSettings message)
Expand All @@ -980,14 +1012,18 @@ private void OnMcpeHurtArmor(McpeHurtArmor message)
Log.Debug($"Hurt Armor: Health={message.health}");
}

public AutoResetEvent PlayerStatusChangedWaitHandle = new AutoResetEvent(false);

private void OnMcpePlayerStatus(McpePlayerStatus message)
{
if (Log.IsDebugEnabled) Log.Debug($"Player status={message.status}");
PlayerStatus = message.status;

if(PlayerStatus == 3)
{
PlayerStatusChangedWaitHandle.Set();

SendMcpeMovePlayer();
SendChat("/help");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/MiNET/MiNET.ServiceKiller/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
</appender>

<root>
<level value="WARN"/>
<level value="FATAL"/>
<!--<appender-ref ref="Console"/>-->
<!--<appender-ref ref="RollingFile" />-->
<!--<appender-ref ref="DebuggerAppender"/>-->
Expand Down
Loading

0 comments on commit c923121

Please sign in to comment.