Skip to content

Commit

Permalink
Optimize thread usage on non-xbox users to scale better with xbox users.
Browse files Browse the repository at this point in the history
  • Loading branch information
NiclasOlofsson committed Jul 10, 2016
1 parent 79b9fdb commit 2f305dc
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 69 deletions.
2 changes: 0 additions & 2 deletions src/MiNET/MiNET/Items/ItemBow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public override void Release(Level world, Player player, BlockCoordinates blockC
float force = CalculateForce(timeUsed);
if (force < 0.1D) return;

Log.Warn($"Force {force}, time {timeUsed}");

Arrow arrow = new Arrow(player, world, !(force < 1.0));
arrow.KnownPosition = (PlayerLocation) player.KnownPosition.Clone();
arrow.KnownPosition.Y += 1.62f;
Expand Down
10 changes: 9 additions & 1 deletion src/MiNET/MiNET/MiNetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,15 @@ internal void HandlePackage(Package message, PlayerNetworkSession playerSession)

if (message.Reliability == Reliability.ReliableOrdered)
{
Task.Run(() => playerSession.AddToProcessing(message));
if (playerSession.CryptoContext == null || playerSession.CryptoContext.UseEncryption == false)
{
playerSession.AddToProcessing(message);
}
else
{
ThreadPool.QueueUserWorkItem(state => playerSession.AddToProcessing(message));
}

return;
}

Expand Down
4 changes: 3 additions & 1 deletion src/MiNET/MiNET/Player.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,13 +1308,15 @@ public virtual void SpawnLevel(Level toLevel, PlayerLocation spawnPoint, bool us

ThreadPool.QueueUserWorkItem(delegate
{
Level.AddPlayer(this, true);

ForcedSendChunks(() =>
{
Level.AddPlayer(this, true);

Log.InfoFormat("Respawn player {0} on level {1}", Username, Level.LevelId);

SendSetTime();

});
});
}
Expand Down
142 changes: 86 additions & 56 deletions src/MiNET/MiNET/PlayerNetworkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public PlayerNetworkSession(Player player, IPEndPoint endPoint, short mtuSize)
MtuSize = mtuSize;
CreateTime = DateTime.UtcNow;
_cancellationToken = new CancellationTokenSource();
Task.Run(ProcessQueue, _cancellationToken.Token);
}

public ConcurrentDictionary<int, SplitPartPackage[]> Splits
Expand Down Expand Up @@ -105,6 +104,17 @@ public void Clean()

queue.Clear();
Splits.Clear();

try
{
_processingThread = null;
_cancellationToken.Dispose();
_waitEvent.Close();
_mainWaitEvent.Close();
}
catch (Exception e)
{
}
}

private long _lastSequenceNumber = -1; // That's the first message with wrapper
Expand All @@ -114,90 +124,102 @@ public void Clean()

private ConcurrentPriorityQueue<int, Package> _queue = new ConcurrentPriorityQueue<int, Package>();
private CancellationTokenSource _cancellationToken;
private Thread _processingThread;

public void AddToProcessing(Package message)
{
if (_cancellationToken.Token.IsCancellationRequested) return;

lock (_eventSync)
try
{
if (_queue.Count == 0 && message.OrderingIndex == _lastSequenceNumber + 1)
if (_cancellationToken.Token.IsCancellationRequested) return;

if (CryptoContext == null || CryptoContext.UseEncryption == false)
{
_lastSequenceNumber = message.OrderingIndex;
HandlePackage(message, this);
return;
}

_queue.Enqueue(message.OrderingIndex, message);
WaitHandle.SignalAndWait(_waitEvent, _mainWaitEvent);
}

//_queue.Add(new KeyValuePair<int, Package>(message.OrderingIndex, message), _cancellationToken.Token);
lock (_eventSync)
{
if (CryptoContext == null || CryptoContext.UseEncryption == false || (_queue.Count == 0 && message.OrderingIndex == _lastSequenceNumber + 1))
{
_lastSequenceNumber = message.OrderingIndex;
HandlePackage(message, this);
return;
}

//HandlePackage(message, this);
//message.PutPool();
if (_processingThread== null)
{
_processingThread = new Thread(ProcessQueueThread);
_processingThread.Start();
Log.Warn($"Started processing thread for {Player.Username}");
}

_queue.Enqueue(message.OrderingIndex, message);
WaitHandle.SignalAndWait(_waitEvent, _mainWaitEvent);
}
}
catch (Exception e)
{
}
}

private void ProcessQueueThread(object o)
{
ProcessQueue();
}

private Task ProcessQueue()
{
while (!_cancellationToken.IsCancellationRequested)
try
{
//try
//{
// KeyValuePair<int, Package> pair = _queue.Take(_cancellationToken.Token);

// //ThreadPool.QueueUserWorkItem(delegate (object data)
// //{
// // HandlePackage(data as Package, this);
// //}, pair.Value);

// HandlePackage(pair.Value as Package, this);

//}
//catch (Exception e)
//{
//}

KeyValuePair<int, Package> pair;

if (_queue.TryPeek(out pair))
while (!_cancellationToken.IsCancellationRequested)
{
if (pair.Key == _lastSequenceNumber + 1)
KeyValuePair<int, Package> pair;

if (_queue.TryPeek(out pair))
{
if (_queue.TryDequeue(out pair))
if (pair.Key == _lastSequenceNumber + 1)
{
_lastSequenceNumber = pair.Key;
if (_queue.TryDequeue(out pair))
{
_lastSequenceNumber = pair.Key;

HandlePackage(pair.Value, this);
HandlePackage(pair.Value, this);

if (_queue.Count == 0)
if (_queue.Count == 0)
{
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
}
}
}
else if (pair.Key <= _lastSequenceNumber)
{
if (Log.IsDebugEnabled) Log.Warn($"{Player.Username} - Resent. Expected {_lastSequenceNumber + 1}, but was {pair.Key}.");
if (_queue.TryDequeue(out pair))
{
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
pair.Value.PutPool();
}
}
}
else if (pair.Key <= _lastSequenceNumber)
{
if (Log.IsDebugEnabled) Log.Warn($"{Player.Username} - Resent. Expected {_lastSequenceNumber + 1}, but was {pair.Key}.");
if (_queue.TryDequeue(out pair))
else
{
pair.Value.PutPool();
if (Log.IsDebugEnabled) Log.Warn($"{Player.Username} - Wrong sequence. Expected {_lastSequenceNumber + 1}, but was {pair.Key}.");
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
}
}
else
{
if (Log.IsDebugEnabled) Log.Warn($"{Player.Username} - Wrong sequence. Expected {_lastSequenceNumber + 1}, but was {pair.Key}.");
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
}
}
else
{
if (_queue.Count == 0)
{
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
if (_queue.Count == 0)
{
WaitHandle.SignalAndWait(_mainWaitEvent, _waitEvent, TimeSpan.FromMilliseconds(50), true);
}
}
}
}
catch (Exception e)
{
}

//Log.Warn($"Exit receive handler task for {Player.Username}");
return Task.CompletedTask;
Expand Down Expand Up @@ -239,7 +261,7 @@ internal void HandlePackage(Package message, PlayerNetworkSession playerSession)
if (typeof (UnknownPackage) == message.GetType())
{
UnknownPackage packet = (UnknownPackage) message;
Log.Warn($"Received unknown package 0x{message.Id:X2}\n{Package.HexDump(packet.Message)}");
if(Log.IsDebugEnabled) Log.Warn($"Received unknown package 0x{message.Id:X2}\n{Package.HexDump(packet.Message)}");

message.PutPool();
return;
Expand Down Expand Up @@ -299,11 +321,19 @@ internal void HandlePackage(Package message, PlayerNetworkSession playerSession)

MiNetServer.TraceReceive(message, message.OrderingIndex);

ThreadPool.QueueUserWorkItem(delegate(object data)
if (CryptoContext != null && CryptoContext.UseEncryption)
{
playerSession.Player?.HandlePackage(data as Package);
ThreadPool.QueueUserWorkItem(delegate(object data)
{
playerSession.Player?.HandlePackage(data as Package);
message.PutPool();
}, message);
}
else
{
playerSession.Player?.HandlePackage(message);
message.PutPool();
}, message);
}
}
}
}
40 changes: 31 additions & 9 deletions src/MiNET/TestPlugin/CoreCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ public void Teleport(Player player)
[Command(Command = "tp")]
public void Teleport(Player player, string world)
{
Level oldLevel = player.Level;

if (player.Level.LevelId.Equals(world))
{
Teleport(player, (int) player.SpawnPosition.X, (int) player.SpawnPosition.Y, (int) player.SpawnPosition.Z);
Expand All @@ -270,18 +272,22 @@ public void Teleport(Player player, string world)

if (levels != null)
{
Level nextLevel = levels.FirstOrDefault(l => l.LevelId != null && l.LevelId.Equals(world));

if (nextLevel == null)
player.SpawnLevel(null, null, true, delegate
{
nextLevel = new Level(world, new FlatlandWorldProvider(), player.GameMode, Difficulty.Normal);
nextLevel.Initialize();
Context.LevelManager.Levels.Add(nextLevel);
}
Level nextLevel = levels.FirstOrDefault(l => l.LevelId != null && l.LevelId.Equals(world));

if (nextLevel == null)
{
nextLevel = new Level(world, new FlatlandWorldProvider(), player.GameMode, Difficulty.Normal);
nextLevel.Initialize();
Context.LevelManager.Levels.Add(nextLevel);
}

player.Level.BroadcastMessage(string.Format("{0} teleported to world {1}.", player.Username, nextLevel.LevelId), type: MessageType.Raw);
return nextLevel;
});

oldLevel.BroadcastMessage(string.Format("{0} teleported to world {1}.", player.Username, player.Level.LevelId), type: MessageType.Raw);

player.SpawnLevel(nextLevel);
}
}, Context.LevelManager.Levels.ToArray());
}
Expand Down Expand Up @@ -845,5 +851,21 @@ public void Test2(Player player)

}

[Command]
public void Count(Player player)
{
List<string> users = new List<string>();
var levels = Context.Server.LevelManager.Levels;
foreach (var level in levels)
{
foreach (var spawnedPlayer in level.GetSpawnedPlayers())
{
users.Add(spawnedPlayer.Username);
}
}

player.SendMessage($"There are {users.Count} of players online.");
}

}
}

0 comments on commit 2f305dc

Please sign in to comment.