From c467c9c818b54b87b6a6c1755e2045d088f75c2e Mon Sep 17 00:00:00 2001 From: Lucas Rosa Date: Tue, 29 Oct 2019 11:44:41 -0300 Subject: [PATCH 1/4] Debugging duplicated client id issue --- .../Server/MqttClientSessionsManager.cs | 80 +++++++++-- .../Server/MqttClientSubscriptionsManager.cs | 7 + .../Server/MqttServerEventDispatcher.cs | 2 + .../MQTTnet.Core.Tests/Server_Status_Tests.cs | 5 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 128 +++++++++++++++++- 5 files changed, 208 insertions(+), 14 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index db70e95..1e7c1b6 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; @@ -29,6 +30,27 @@ namespace MQTTnet.Server private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; + public static class TestLogger + { + public static void WriteLine(string message) + { + var path = @"c:\temp\test1.txt"; + FileStream logFile; + if (!System.IO.File.Exists(path)) + logFile = System.IO.File.Create(path); + else + logFile = System.IO.File.Open(path, FileMode.Append); + + using (var writer = new System.IO.StreamWriter(logFile)) + { + writer.WriteLine($"{DateTime.Now} - {message}"); + } + + logFile.Dispose(); + } + } + + public MqttClientSessionsManager( IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, @@ -36,6 +58,7 @@ namespace MQTTnet.Server MqttServerEventDispatcher eventDispatcher, IMqttNetChildLogger logger) { + TestLogger.WriteLine("Newly new"); _cancellationToken = cancellationToken; if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -48,11 +71,13 @@ namespace MQTTnet.Server public void Start() { + TestLogger.WriteLine("Start"); Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger); } public async Task StopAsync() { + TestLogger.WriteLine("Stop"); foreach (var connection in _connections.Values) { await connection.StopAsync().ConfigureAwait(false); @@ -66,6 +91,7 @@ namespace MQTTnet.Server public Task> GetClientStatusAsync() { + TestLogger.WriteLine("Status"); var result = new List(); foreach (var connection in _connections.Values) @@ -85,6 +111,7 @@ namespace MQTTnet.Server public Task> GetSessionStatusAsync() { + TestLogger.WriteLine("Session"); var result = new List(); foreach (var session in _sessions.Values) @@ -100,6 +127,7 @@ namespace MQTTnet.Server public void DispatchApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender) { + TestLogger.WriteLine("Message"); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); _messageQueue.Enqueue(new MqttEnqueuedApplicationMessage(applicationMessage, sender)); @@ -107,6 +135,7 @@ namespace MQTTnet.Server public Task SubscribeAsync(string clientId, ICollection topicFilters) { + TestLogger.WriteLine("sub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -120,6 +149,7 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(string clientId, IEnumerable topicFilters) { + TestLogger.WriteLine("unsub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -133,6 +163,7 @@ namespace MQTTnet.Server public async Task DeleteSessionAsync(string clientId) { + TestLogger.WriteLine("Delete"); if (_connections.TryGetValue(clientId, out var connection)) { await connection.StopAsync().ConfigureAwait(false); @@ -147,11 +178,13 @@ namespace MQTTnet.Server public void Dispose() { + TestLogger.WriteLine("byebye"); _messageQueue?.Dispose(); } private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) { + TestLogger.WriteLine("queue"); while (!cancellationToken.IsCancellationRequested) { try @@ -170,6 +203,7 @@ namespace MQTTnet.Server private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) { + TestLogger.WriteLine("process message"); try { if (cancellationToken.IsCancellationRequested) @@ -178,6 +212,7 @@ namespace MQTTnet.Server } var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); + TestLogger.WriteLine("dequeued"); var queuedApplicationMessage = dequeueResult.Item; var sender = queuedApplicationMessage.Sender; @@ -209,6 +244,7 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); } + TestLogger.WriteLine($"sessions: {_sessions.Count}"); foreach (var clientSession in _sessions.Values) { clientSession.EnqueueApplicationMessage( @@ -219,18 +255,23 @@ namespace MQTTnet.Server } catch (OperationCanceledException) { + TestLogger.WriteLine($"no queue"); } catch (Exception exception) { + TestLogger.WriteLine($"no queue {exception}"); _logger.Error(exception, "Unhandled exception while processing next queued application message."); } } private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { + TestLogger.WriteLine($"handle"); var disconnectType = MqttClientDisconnectType.NotClean; string clientId = null; + var ok = true; + try { var firstPacket = await channelAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); @@ -241,11 +282,14 @@ namespace MQTTnet.Server } clientId = connectPacket.ClientId; + TestLogger.WriteLine($"validating {clientId}"); var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { + TestLogger.WriteLine($"{clientId} not good"); + ok = false; // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); @@ -254,42 +298,53 @@ namespace MQTTnet.Server return; } + TestLogger.WriteLine($"{clientId} good"); + var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false); + + TestLogger.WriteLine($"{clientId} all good"); } catch (OperationCanceledException) { + TestLogger.WriteLine($"no"); } catch (Exception exception) { + TestLogger.WriteLine($"no {exception}"); _logger.Error(exception, exception.Message); } finally { - if (clientId != null) - { - _connections.TryRemove(clientId, out _); - - if (!_options.EnablePersistentSessions) + if (ok) + { + TestLogger.WriteLine($"finally {clientId}"); + if (clientId != null) { - await DeleteSessionAsync(clientId).ConfigureAwait(false); + _connections.TryRemove(clientId, out _); + + if (!_options.EnablePersistentSessions) + { + await DeleteSessionAsync(clientId).ConfigureAwait(false); + } } - } - await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false); + await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false); - if (clientId != null) - { - await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false); + if (clientId != null) + { + await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false); + } } } } private async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { + TestLogger.WriteLine("validate"); var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); var connectionValidator = _options.ConnectionValidator; @@ -318,6 +373,7 @@ namespace MQTTnet.Server private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { + TestLogger.WriteLine("create"); await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); try { @@ -364,6 +420,7 @@ namespace MQTTnet.Server private async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) { + TestLogger.WriteLine("intercept"); var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) { @@ -392,6 +449,7 @@ namespace MQTTnet.Server private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter) { + TestLogger.WriteLine("clean"); try { await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index c84a018..3b166ba 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using MQTTnet.Packets; using MQTTnet.Protocol; +using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server { @@ -16,6 +17,7 @@ namespace MQTTnet.Server public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) { + TestLogger.WriteLine("sub manager"); _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); // TODO: Consider removing the server options here and build a new class "ISubscriptionInterceptor" and just pass it. The instance is generated in the root server class upon start. @@ -25,6 +27,7 @@ namespace MQTTnet.Server public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, MqttConnectPacket connectPacket) { + TestLogger.WriteLine("sub1"); if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); @@ -76,6 +79,7 @@ namespace MQTTnet.Server public async Task SubscribeAsync(IEnumerable topicFilters) { + TestLogger.WriteLine("sub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); foreach (var topicFilter in topicFilters) @@ -100,6 +104,7 @@ namespace MQTTnet.Server public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) { + TestLogger.WriteLine("unsub1"); if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); var unsubAckPacket = new MqttUnsubAckPacket @@ -132,6 +137,7 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(IEnumerable topicFilters) { + TestLogger.WriteLine("unsub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); lock (_subscriptions) @@ -147,6 +153,7 @@ namespace MQTTnet.Server public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel) { + TestLogger.WriteLine("check"); var qosLevels = new HashSet(); lock (_subscriptions) diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index e6e608a..d230eac 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; +using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server { @@ -55,6 +56,7 @@ namespace MQTTnet.Server public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) { + TestLogger.WriteLine("handle sub"); var handler = ClientSubscribedTopicHandler; if (handler == null) { diff --git a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs index 8111bd0..c4a5d27 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs @@ -6,6 +6,7 @@ using MQTTnet.Tests.Mockups; using MQTTnet.Client; using MQTTnet.Protocol; using MQTTnet.Server; +using System.Threading; namespace MQTTnet.Tests { @@ -55,10 +56,10 @@ namespace MQTTnet.Tests var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("client1")); - await Task.Delay(500); + await Task.Delay(1000); var clientStatus = await server.GetClientStatusAsync(); - + Assert.AreEqual(1, clientStatus.Count); Assert.IsTrue(clientStatus.Any(s => s.ClientId == "client1")); diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index b2b3b70..5353ebd 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -917,6 +917,109 @@ namespace MQTTnet.Tests } } + + private Dictionary _connected; + private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs) + { + if (_connected.ContainsKey(eventArgs.ClientId)) + { + eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + return; + } + _connected[eventArgs.ClientId] = true; + eventArgs.ReasonCode = MqttConnectReasonCode.Success; + return; + } + + [TestMethod] + public async Task Same_Client_Id_Refuse_Connection() + { + using (var testEnvironment = new TestEnvironment()) + { + _connected = new Dictionary(); + var options = new MqttServerOptionsBuilder(); + options.WithConnectionValidator(e => ConnectionValidationHandler(e)); + var server = await testEnvironment.StartServerAsync(options); + + var events = new List(); + + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => + { + lock (events) + { + events.Add("c"); + } + }); + + server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => + { + lock (events) + { + events.Add("d"); + } + }); + + var clientOptions = new MqttClientOptionsBuilder() + .WithClientId("same_id"); + + // c + var c1 = await testEnvironment.ConnectClientAsync(clientOptions); + + c1.UseDisconnectedHandler(_ => + { + lock (events) + { + events.Add("x"); + } + }); + + + c1.UseApplicationMessageReceivedHandler(_ => + { + lock (events) + { + events.Add("r"); + } + + }); + + c1.SubscribeAsync("topic").Wait(); + + await Task.Delay(500); + + c1.PublishAsync("topic").Wait(); + + await Task.Delay(500); + + + var flow = string.Join(string.Empty, events); + Assert.AreEqual("cr", flow); + + try + { + await testEnvironment.ConnectClientAsync(clientOptions); + Assert.Fail("same id connection is expected to fail"); + } + catch + { + //same id connection is expected to fail + } + + await Task.Delay(500); + + flow = string.Join(string.Empty, events); + Assert.AreEqual("cr", flow); + + c1.PublishAsync("topic").Wait(); + + await Task.Delay(500); + + flow = string.Join(string.Empty, events); + Assert.AreEqual("crr", flow); + + } + } + [TestMethod] public async Task Same_Client_Id_Connect_Disconnect_Event_Order() { @@ -956,17 +1059,40 @@ namespace MQTTnet.Tests // dc var c2 = await testEnvironment.ConnectClientAsync(clientOptions); + c2.UseApplicationMessageReceivedHandler(_ => + { + lock (events) + { + events.Add("r"); + } + + }); + c2.SubscribeAsync("topic").Wait(); + await Task.Delay(500); flow = string.Join(string.Empty, events); Assert.AreEqual("cdc", flow); + // r + c2.PublishAsync("topic").Wait(); + + await Task.Delay(500); + + flow = string.Join(string.Empty, events); + Assert.AreEqual("cdcr", flow); + + // nothing + + Assert.AreEqual(false, c1.IsConnected); await c1.DisconnectAsync(); + Assert.AreEqual (false, c1.IsConnected); await Task.Delay(500); // d + Assert.AreEqual(true, c2.IsConnected); await c2.DisconnectAsync(); await Task.Delay(500); @@ -974,7 +1100,7 @@ namespace MQTTnet.Tests await server.StopAsync(); flow = string.Join(string.Empty, events); - Assert.AreEqual("cdcd", flow); + Assert.AreEqual("cdcrd", flow); } } From e3f6b579b2dc56020cdcc26157d40a8edd39a904 Mon Sep 17 00:00:00 2001 From: Lucas Rosa Date: Tue, 29 Oct 2019 12:32:20 -0300 Subject: [PATCH 2/4] Working PoC --- Source/MQTTnet/Server/MqttClientConnection.cs | 11 +++++ .../Server/MqttClientSessionsManager.cs | 40 +++++++++++-------- .../Server/MqttClientSubscriptionsManager.cs | 4 +- .../Mockups/TestEnvironment.cs | 4 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 21 ++++++++++ 5 files changed, 60 insertions(+), 20 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index e71d1a8..677f920 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -12,6 +12,7 @@ using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; +using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server { @@ -51,6 +52,7 @@ namespace MQTTnet.Server MqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) { + TestLogger.WriteLine($"MqttClientConnection init"); Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); @@ -79,6 +81,7 @@ namespace MQTTnet.Server public async Task StopAsync() { + TestLogger.WriteLine($"MqttClientConnection stop"); StopInternal(); var task = _packageReceiverTask; @@ -153,12 +156,14 @@ namespace MQTTnet.Server while (!_cancellationToken.IsCancellationRequested) { + TestLogger.WriteLine($"MqttClientConnection while"); var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationToken.Token).ConfigureAwait(false); if (packet == null) { // The client has closed the connection gracefully. break; } + TestLogger.WriteLine($"MqttClientConnection pack"); Interlocked.Increment(ref _sentPacketsCount); _lastPacketReceivedTimestamp = DateTime.UtcNow; @@ -257,11 +262,13 @@ namespace MQTTnet.Server private void StopInternal() { + TestLogger.WriteLine($"MqttClientConnection stop int"); _cancellationToken.Cancel(false); } private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) { + TestLogger.WriteLine($"MqttClientConnection retainedmessages"); var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var applicationMessage in retainedMessages) { @@ -271,6 +278,7 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) { + TestLogger.WriteLine($"MqttClientConnection subpacket"); // TODO: Let the channel adapter create the packet. var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); @@ -294,6 +302,7 @@ namespace MQTTnet.Server private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { + TestLogger.WriteLine($"MqttClientConnection pub"); Interlocked.Increment(ref _sentApplicationMessagesCount); switch (publishPacket.QualityOfServiceLevel) @@ -351,6 +360,7 @@ namespace MQTTnet.Server private async Task SendPendingPacketsAsync(CancellationToken cancellationToken) { + TestLogger.WriteLine($"MqttClientConnection send"); MqttQueuedApplicationMessage queuedApplicationMessage = null; MqttPublishPacket publishPacket = null; @@ -466,6 +476,7 @@ namespace MQTTnet.Server private async Task SendAsync(MqttBasePacket packet) { + TestLogger.WriteLine($"MqttClientConnection send"); await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false); Interlocked.Increment(ref _receivedPacketsCount); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 1e7c1b6..61a3bf2 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -32,21 +32,26 @@ namespace MQTTnet.Server public static class TestLogger { + private static object _lock = new object(); public static void WriteLine(string message) { - var path = @"c:\temp\test1.txt"; - FileStream logFile; - if (!System.IO.File.Exists(path)) - logFile = System.IO.File.Create(path); - else - logFile = System.IO.File.Open(path, FileMode.Append); - - using (var writer = new System.IO.StreamWriter(logFile)) + lock (_lock) { - writer.WriteLine($"{DateTime.Now} - {message}"); + var path = @"c:\temp\test1.txt"; + FileStream logFile; + if (!System.IO.File.Exists(path)) + logFile = System.IO.File.Create(path); + else + logFile = System.IO.File.Open(path, FileMode.Append); + + using (var writer = new System.IO.StreamWriter(logFile)) + { + writer.WriteLine($"{DateTime.Now} - {message}"); + } + + logFile.Dispose(); } - logFile.Dispose(); } } @@ -203,7 +208,7 @@ namespace MQTTnet.Server private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine("process message"); + TestLogger.WriteLine("pm process message"); try { if (cancellationToken.IsCancellationRequested) @@ -212,7 +217,7 @@ namespace MQTTnet.Server } var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); - TestLogger.WriteLine("dequeued"); + TestLogger.WriteLine("pm dequeued"); var queuedApplicationMessage = dequeueResult.Item; var sender = queuedApplicationMessage.Sender; @@ -244,7 +249,7 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); } - TestLogger.WriteLine($"sessions: {_sessions.Count}"); + TestLogger.WriteLine($"pm sessions: {_sessions.Count}"); foreach (var clientSession in _sessions.Values) { clientSession.EnqueueApplicationMessage( @@ -255,11 +260,11 @@ namespace MQTTnet.Server } catch (OperationCanceledException) { - TestLogger.WriteLine($"no queue"); + TestLogger.WriteLine($"pm no queue"); } catch (Exception exception) { - TestLogger.WriteLine($"no queue {exception}"); + TestLogger.WriteLine($"pm no queue {exception}"); _logger.Error(exception, "Unhandled exception while processing next queued application message."); } } @@ -373,7 +378,7 @@ namespace MQTTnet.Server private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("create"); + TestLogger.WriteLine("cc create"); await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); try { @@ -402,6 +407,7 @@ namespace MQTTnet.Server if (session == null) { session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _logger); + TestLogger.WriteLine("cc created"); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } @@ -409,6 +415,8 @@ namespace MQTTnet.Server _connections[connection.ClientId] = connection; _sessions[session.ClientId] = session; + TestLogger.WriteLine($"cc connections {_connections.Count}"); + TestLogger.WriteLine($"cc sessions {_sessions.Count}"); return connection; } diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 3b166ba..3e39bab 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -153,7 +153,7 @@ namespace MQTTnet.Server public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel) { - TestLogger.WriteLine("check"); + TestLogger.WriteLine("check subs"); var qosLevels = new HashSet(); lock (_subscriptions) @@ -164,7 +164,7 @@ namespace MQTTnet.Server { continue; } - + TestLogger.WriteLine("is match"); qosLevels.Add(subscription.Value.QualityOfServiceLevel); } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index 5cda537..b5fa8a3 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -114,7 +114,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreServerLogErrors && _serverErrors.Count > 0) { - throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); + //throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); } } @@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreClientLogErrors && _clientErrors.Count > 0) { - throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); + //throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); } } } diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 5353ebd..de3e871 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -943,6 +943,8 @@ namespace MQTTnet.Tests var events = new List(); + var connected = true; + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => { lock (events) @@ -970,6 +972,15 @@ namespace MQTTnet.Tests lock (events) { events.Add("x"); + connected = false; + } + }); + + c1.UseConnectedHandler(_ => + { + lock (events) + { + connected = true; } }); @@ -1007,6 +1018,16 @@ namespace MQTTnet.Tests await Task.Delay(500); + /*if (!connected) + { + c1.ReconnectAsync().Wait(); + } + + if (!c1.IsConnected) + { + c1.ReconnectAsync().Wait(); + }*/ + flow = string.Join(string.Empty, events); Assert.AreEqual("cr", flow); From 5c25905510d2da368df6bd7626ede57057b0d9f5 Mon Sep 17 00:00:00 2001 From: Lucas Rosa Date: Tue, 29 Oct 2019 12:48:04 -0300 Subject: [PATCH 3/4] Clean up --- Source/MQTTnet/Server/MqttClientConnection.cs | 10 ---- .../Server/MqttClientSessionsManager.cs | 58 ------------------- .../Server/MqttClientSubscriptionsManager.cs | 7 --- .../Server/MqttServerEventDispatcher.cs | 1 - .../Mockups/TestEnvironment.cs | 4 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 25 +------- 6 files changed, 4 insertions(+), 101 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 677f920..a114e28 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -52,7 +52,6 @@ namespace MQTTnet.Server MqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) { - TestLogger.WriteLine($"MqttClientConnection init"); Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); @@ -81,7 +80,6 @@ namespace MQTTnet.Server public async Task StopAsync() { - TestLogger.WriteLine($"MqttClientConnection stop"); StopInternal(); var task = _packageReceiverTask; @@ -156,14 +154,12 @@ namespace MQTTnet.Server while (!_cancellationToken.IsCancellationRequested) { - TestLogger.WriteLine($"MqttClientConnection while"); var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationToken.Token).ConfigureAwait(false); if (packet == null) { // The client has closed the connection gracefully. break; } - TestLogger.WriteLine($"MqttClientConnection pack"); Interlocked.Increment(ref _sentPacketsCount); _lastPacketReceivedTimestamp = DateTime.UtcNow; @@ -262,13 +258,11 @@ namespace MQTTnet.Server private void StopInternal() { - TestLogger.WriteLine($"MqttClientConnection stop int"); _cancellationToken.Cancel(false); } private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) { - TestLogger.WriteLine($"MqttClientConnection retainedmessages"); var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var applicationMessage in retainedMessages) { @@ -278,7 +272,6 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) { - TestLogger.WriteLine($"MqttClientConnection subpacket"); // TODO: Let the channel adapter create the packet. var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); @@ -302,7 +295,6 @@ namespace MQTTnet.Server private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { - TestLogger.WriteLine($"MqttClientConnection pub"); Interlocked.Increment(ref _sentApplicationMessagesCount); switch (publishPacket.QualityOfServiceLevel) @@ -360,7 +352,6 @@ namespace MQTTnet.Server private async Task SendPendingPacketsAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine($"MqttClientConnection send"); MqttQueuedApplicationMessage queuedApplicationMessage = null; MqttPublishPacket publishPacket = null; @@ -476,7 +467,6 @@ namespace MQTTnet.Server private async Task SendAsync(MqttBasePacket packet) { - TestLogger.WriteLine($"MqttClientConnection send"); await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false); Interlocked.Increment(ref _receivedPacketsCount); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 61a3bf2..5a27b46 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -30,32 +30,6 @@ namespace MQTTnet.Server private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; - public static class TestLogger - { - private static object _lock = new object(); - public static void WriteLine(string message) - { - lock (_lock) - { - var path = @"c:\temp\test1.txt"; - FileStream logFile; - if (!System.IO.File.Exists(path)) - logFile = System.IO.File.Create(path); - else - logFile = System.IO.File.Open(path, FileMode.Append); - - using (var writer = new System.IO.StreamWriter(logFile)) - { - writer.WriteLine($"{DateTime.Now} - {message}"); - } - - logFile.Dispose(); - } - - } - } - - public MqttClientSessionsManager( IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, @@ -63,7 +37,6 @@ namespace MQTTnet.Server MqttServerEventDispatcher eventDispatcher, IMqttNetChildLogger logger) { - TestLogger.WriteLine("Newly new"); _cancellationToken = cancellationToken; if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -76,13 +49,11 @@ namespace MQTTnet.Server public void Start() { - TestLogger.WriteLine("Start"); Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger); } public async Task StopAsync() { - TestLogger.WriteLine("Stop"); foreach (var connection in _connections.Values) { await connection.StopAsync().ConfigureAwait(false); @@ -96,7 +67,6 @@ namespace MQTTnet.Server public Task> GetClientStatusAsync() { - TestLogger.WriteLine("Status"); var result = new List(); foreach (var connection in _connections.Values) @@ -116,7 +86,6 @@ namespace MQTTnet.Server public Task> GetSessionStatusAsync() { - TestLogger.WriteLine("Session"); var result = new List(); foreach (var session in _sessions.Values) @@ -132,7 +101,6 @@ namespace MQTTnet.Server public void DispatchApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender) { - TestLogger.WriteLine("Message"); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); _messageQueue.Enqueue(new MqttEnqueuedApplicationMessage(applicationMessage, sender)); @@ -140,7 +108,6 @@ namespace MQTTnet.Server public Task SubscribeAsync(string clientId, ICollection topicFilters) { - TestLogger.WriteLine("sub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -154,7 +121,6 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(string clientId, IEnumerable topicFilters) { - TestLogger.WriteLine("unsub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -168,7 +134,6 @@ namespace MQTTnet.Server public async Task DeleteSessionAsync(string clientId) { - TestLogger.WriteLine("Delete"); if (_connections.TryGetValue(clientId, out var connection)) { await connection.StopAsync().ConfigureAwait(false); @@ -183,13 +148,11 @@ namespace MQTTnet.Server public void Dispose() { - TestLogger.WriteLine("byebye"); _messageQueue?.Dispose(); } private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine("queue"); while (!cancellationToken.IsCancellationRequested) { try @@ -208,7 +171,6 @@ namespace MQTTnet.Server private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine("pm process message"); try { if (cancellationToken.IsCancellationRequested) @@ -217,7 +179,6 @@ namespace MQTTnet.Server } var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); - TestLogger.WriteLine("pm dequeued"); var queuedApplicationMessage = dequeueResult.Item; var sender = queuedApplicationMessage.Sender; @@ -249,7 +210,6 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); } - TestLogger.WriteLine($"pm sessions: {_sessions.Count}"); foreach (var clientSession in _sessions.Values) { clientSession.EnqueueApplicationMessage( @@ -260,18 +220,15 @@ namespace MQTTnet.Server } catch (OperationCanceledException) { - TestLogger.WriteLine($"pm no queue"); } catch (Exception exception) { - TestLogger.WriteLine($"pm no queue {exception}"); _logger.Error(exception, "Unhandled exception while processing next queued application message."); } } private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { - TestLogger.WriteLine($"handle"); var disconnectType = MqttClientDisconnectType.NotClean; string clientId = null; @@ -287,13 +244,11 @@ namespace MQTTnet.Server } clientId = connectPacket.ClientId; - TestLogger.WriteLine($"validating {clientId}"); var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { - TestLogger.WriteLine($"{clientId} not good"); ok = false; // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. @@ -303,30 +258,24 @@ namespace MQTTnet.Server return; } - TestLogger.WriteLine($"{clientId} good"); - var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false); - TestLogger.WriteLine($"{clientId} all good"); } catch (OperationCanceledException) { - TestLogger.WriteLine($"no"); } catch (Exception exception) { - TestLogger.WriteLine($"no {exception}"); _logger.Error(exception, exception.Message); } finally { if (ok) { - TestLogger.WriteLine($"finally {clientId}"); if (clientId != null) { _connections.TryRemove(clientId, out _); @@ -349,7 +298,6 @@ namespace MQTTnet.Server private async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("validate"); var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); var connectionValidator = _options.ConnectionValidator; @@ -378,7 +326,6 @@ namespace MQTTnet.Server private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("cc create"); await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); try { @@ -407,7 +354,6 @@ namespace MQTTnet.Server if (session == null) { session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _logger); - TestLogger.WriteLine("cc created"); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } @@ -415,8 +361,6 @@ namespace MQTTnet.Server _connections[connection.ClientId] = connection; _sessions[session.ClientId] = session; - TestLogger.WriteLine($"cc connections {_connections.Count}"); - TestLogger.WriteLine($"cc sessions {_sessions.Count}"); return connection; } @@ -428,7 +372,6 @@ namespace MQTTnet.Server private async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) { - TestLogger.WriteLine("intercept"); var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) { @@ -457,7 +400,6 @@ namespace MQTTnet.Server private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("clean"); try { await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 3e39bab..1d675ba 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -17,7 +17,6 @@ namespace MQTTnet.Server public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) { - TestLogger.WriteLine("sub manager"); _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); // TODO: Consider removing the server options here and build a new class "ISubscriptionInterceptor" and just pass it. The instance is generated in the root server class upon start. @@ -27,7 +26,6 @@ namespace MQTTnet.Server public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, MqttConnectPacket connectPacket) { - TestLogger.WriteLine("sub1"); if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); @@ -79,7 +77,6 @@ namespace MQTTnet.Server public async Task SubscribeAsync(IEnumerable topicFilters) { - TestLogger.WriteLine("sub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); foreach (var topicFilter in topicFilters) @@ -104,7 +101,6 @@ namespace MQTTnet.Server public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) { - TestLogger.WriteLine("unsub1"); if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); var unsubAckPacket = new MqttUnsubAckPacket @@ -137,7 +133,6 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(IEnumerable topicFilters) { - TestLogger.WriteLine("unsub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); lock (_subscriptions) @@ -153,7 +148,6 @@ namespace MQTTnet.Server public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel) { - TestLogger.WriteLine("check subs"); var qosLevels = new HashSet(); lock (_subscriptions) @@ -164,7 +158,6 @@ namespace MQTTnet.Server { continue; } - TestLogger.WriteLine("is match"); qosLevels.Add(subscription.Value.QualityOfServiceLevel); } } diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index d230eac..f899081 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -56,7 +56,6 @@ namespace MQTTnet.Server public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) { - TestLogger.WriteLine("handle sub"); var handler = ClientSubscribedTopicHandler; if (handler == null) { diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index b5fa8a3..5cda537 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -114,7 +114,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreServerLogErrors && _serverErrors.Count > 0) { - //throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); + throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); } } @@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreClientLogErrors && _clientErrors.Count > 0) { - //throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); + throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); } } } diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index de3e871..c3d6df1 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -936,6 +936,8 @@ namespace MQTTnet.Tests { using (var testEnvironment = new TestEnvironment()) { + testEnvironment.IgnoreClientLogErrors = true; + _connected = new Dictionary(); var options = new MqttServerOptionsBuilder(); options.WithConnectionValidator(e => ConnectionValidationHandler(e)); @@ -943,8 +945,6 @@ namespace MQTTnet.Tests var events = new List(); - var connected = true; - server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => { lock (events) @@ -972,15 +972,6 @@ namespace MQTTnet.Tests lock (events) { events.Add("x"); - connected = false; - } - }); - - c1.UseConnectedHandler(_ => - { - lock (events) - { - connected = true; } }); @@ -1002,7 +993,6 @@ namespace MQTTnet.Tests await Task.Delay(500); - var flow = string.Join(string.Empty, events); Assert.AreEqual("cr", flow); @@ -1018,16 +1008,6 @@ namespace MQTTnet.Tests await Task.Delay(500); - /*if (!connected) - { - c1.ReconnectAsync().Wait(); - } - - if (!c1.IsConnected) - { - c1.ReconnectAsync().Wait(); - }*/ - flow = string.Join(string.Empty, events); Assert.AreEqual("cr", flow); @@ -1037,7 +1017,6 @@ namespace MQTTnet.Tests flow = string.Join(string.Empty, events); Assert.AreEqual("crr", flow); - } } From 4105beb7e68a7068d9b9d3b9dacd71af8eaec828 Mon Sep 17 00:00:00 2001 From: Lucas Rosa Date: Tue, 29 Oct 2019 12:55:57 -0300 Subject: [PATCH 4/4] Some more cleaning up: --- Source/MQTTnet/Server/MqttClientConnection.cs | 1 - Source/MQTTnet/Server/MqttClientSessionsManager.cs | 9 +++------ Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs | 2 +- Source/MQTTnet/Server/MqttServerEventDispatcher.cs | 1 - 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index a114e28..e71d1a8 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -12,7 +12,6 @@ using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; -using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server { diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 5a27b46..b800d85 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.IO; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; @@ -231,8 +230,7 @@ namespace MQTTnet.Server { var disconnectType = MqttClientDisconnectType.NotClean; string clientId = null; - - var ok = true; + var clientWasConnected = true; try { @@ -249,7 +247,7 @@ namespace MQTTnet.Server if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { - ok = false; + clientWasConnected = false; // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); @@ -263,7 +261,6 @@ namespace MQTTnet.Server await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false); - } catch (OperationCanceledException) { @@ -274,7 +271,7 @@ namespace MQTTnet.Server } finally { - if (ok) + if (clientWasConnected) { if (clientId != null) { diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 1d675ba..c84a018 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading.Tasks; using MQTTnet.Packets; using MQTTnet.Protocol; -using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server { @@ -158,6 +157,7 @@ namespace MQTTnet.Server { continue; } + qosLevels.Add(subscription.Value.QualityOfServiceLevel); } } diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index f899081..e6e608a 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -2,7 +2,6 @@ using System.Threading.Tasks; using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; -using static MQTTnet.Server.MqttClientSessionsManager; namespace MQTTnet.Server {