From 64b38487bc09a6a6f566e30df20b60bc39f1009f Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Wed, 20 Jun 2018 08:20:54 +0200 Subject: [PATCH] improve session manager --- .../Server/MqttClientSessionsManager.cs | 65 +++++++++++++------ 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 1cb301a..1b507d3 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; @@ -14,7 +15,11 @@ namespace MQTTnet.Server public class MqttClientSessionsManager { private readonly BlockingCollection _messageQueue = new BlockingCollection(); - private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); + + /// + /// manual locking dictionaries is faster than using concurrent dictionary + /// + private readonly Dictionary _sessions = new Dictionary(); private readonly CancellationToken _cancellationToken; @@ -43,12 +48,16 @@ namespace MQTTnet.Server public Task StopAsync() { - foreach (var session in _sessions) + lock (_sessions) { - session.Value.Stop(MqttClientDisconnectType.NotClean); - } + foreach (var session in _sessions) + { + session.Value.Stop(MqttClientDisconnectType.NotClean); + } - _sessions.Clear(); + _sessions.Clear(); + } + _messageQueue.Dispose(); return Task.FromResult(0); } @@ -60,10 +69,11 @@ namespace MQTTnet.Server public Task> GetClientStatusAsync() { var result = new List(); - foreach (var session in _sessions) + + foreach (var session in GetSessions()) { - var status = new MqttClientSessionStatus(this, session.Value); - session.Value.FillStatus(status); + var status = new MqttClientSessionStatus(this, session); + session.FillStatus(status); result.Add(status); } @@ -83,12 +93,15 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - if (!_sessions.TryGetValue(clientId, out var session)) + lock (_sessions) { - throw new InvalidOperationException($"Client session '{clientId}' is unknown."); - } + if (!_sessions.TryGetValue(clientId, out var session)) + { + throw new InvalidOperationException($"Client session '{clientId}' is unknown."); + } - return session.SubscribeAsync(topicFilters); + return session.SubscribeAsync(topicFilters); + } } public Task UnsubscribeAsync(string clientId, IList topicFilters) @@ -96,17 +109,23 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - if (!_sessions.TryGetValue(clientId, out var session)) + lock (_sessions) { - throw new InvalidOperationException($"Client session '{clientId}' is unknown."); - } + if (!_sessions.TryGetValue(clientId, out var session)) + { + throw new InvalidOperationException($"Client session '{clientId}' is unknown."); + } - return session.UnsubscribeAsync(topicFilters); + return session.UnsubscribeAsync(topicFilters); + } } public void DeleteSession(string clientId) { - _sessions.TryRemove(clientId, out _); + lock (_sessions) + { + _sessions.Remove(clientId); + } _logger.Verbose("Session for client '{0}' deleted.", clientId); } @@ -143,7 +162,7 @@ namespace MQTTnet.Server _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult(); } - foreach (var clientSession in _sessions.Values) + foreach (var clientSession in GetSessions()) { clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage); } @@ -158,6 +177,14 @@ namespace MQTTnet.Server } } + private List GetSessions() + { + lock (_sessions) + { + return _sessions.Values.ToList(); + } + } + private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; @@ -263,7 +290,7 @@ namespace MQTTnet.Server { if (connectPacket.CleanSession) { - _sessions.TryRemove(connectPacket.ClientId, out _); + _sessions.Remove(connectPacket.ClientId); clientSession.Stop(MqttClientDisconnectType.Clean); clientSession.Dispose();