diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index 4c264f9..11d7111 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -291,13 +291,13 @@ namespace MQTTnet.Client private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) { _sendTracker.Restart(); - return _adapter.SendPacketsAsync(_options.CommunicationTimeout, cancellationToken, new[] { packet }); + return _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { packet }, cancellationToken); } private Task SendAsync(IEnumerable packets, CancellationToken cancellationToken) { _sendTracker.Restart(); - return _adapter.SendPacketsAsync(_options.CommunicationTimeout, cancellationToken, packets); + return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, cancellationToken); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket @@ -313,8 +313,8 @@ namespace MQTTnet.Client var packetAwaiter = _packetDispatcher.AddPacketAwaiter(identifier); try { - await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, _cancellationTokenSource.Token).ConfigureAwait(false); - var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); + await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, cancellationToken).ConfigureAwait(false); + var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); return (TResponsePacket)respone; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs index 3f21536..8dc96e4 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs @@ -13,12 +13,12 @@ namespace MQTTnet.Server private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = new Stopwatch(); private readonly string _clientId; - private readonly Func _timeoutCallback; + private readonly Action _timeoutCallback; private readonly IMqttNetLogger _logger; private Task _workerTask; - public MqttClientKeepAliveMonitor(string clientId, Func timeoutCallback, IMqttNetLogger logger) + public MqttClientKeepAliveMonitor(string clientId, Action timeoutCallback, IMqttNetLogger logger) { _clientId = clientId; _timeoutCallback = timeoutCallback; @@ -61,10 +61,7 @@ namespace MQTTnet.Server { _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", _clientId); - if (_timeoutCallback != null) - { - await _timeoutCallback().ConfigureAwait(false); - } + _timeoutCallback?.Invoke(); return; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 8c98ac3..f56a8c9 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -127,7 +127,7 @@ namespace MQTTnet.Server if (!cancellationToken.IsCancellationRequested) { - await _clientSession.StopAsync(MqttClientDisconnectType.NotClean).ConfigureAwait(false); + _clientSession.Stop(MqttClientDisconnectType.NotClean); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index 3b51e9c..0e61b10 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -39,7 +39,7 @@ namespace MQTTnet.Server ClientId = clientId; - KeepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, StopDueToKeepAliveTimeoutAsync, _logger); + KeepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, StopDueToKeepAliveTimeout, _logger); SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server); PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); } @@ -101,13 +101,13 @@ namespace MQTTnet.Server return _wasCleanDisconnect; } - public async Task StopAsync(MqttClientDisconnectType type) + public void Stop(MqttClientDisconnectType type) { try { if (_cancellationTokenSource == null) { - return Task.FromResult(0); + return; } _wasCleanDisconnect = type == MqttClientDisconnectType.Clean; @@ -128,8 +128,6 @@ namespace MQTTnet.Server { _logger.Info("Client '{0}': Session stopped.", ClientId); } - - return Task.FromResult(0); } public async Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) @@ -183,10 +181,10 @@ namespace MQTTnet.Server _cancellationTokenSource?.Dispose(); } - private Task StopDueToKeepAliveTimeoutAsync() + private void StopDueToKeepAliveTimeout() { _logger.Info("Client '{0}': Timeout while waiting for KeepAlive packet.", ClientId); - return StopAsync(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean); } private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) @@ -214,7 +212,7 @@ namespace MQTTnet.Server _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } - await StopAsync(MqttClientDisconnectType.NotClean).ConfigureAwait(false); + Stop(MqttClientDisconnectType.NotClean); } } @@ -263,16 +261,20 @@ namespace MQTTnet.Server if (packet is MqttDisconnectPacket) { - return StopAsync(MqttClientDisconnectType.Clean); + Stop(MqttClientDisconnectType.Clean); + return Task.FromResult(0); } if (packet is MqttConnectPacket) { - return StopAsync(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean); + return Task.FromResult(0); } _logger.Warning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); - return StopAsync(MqttClientDisconnectType.NotClean); + Stop(MqttClientDisconnectType.NotClean); + + return Task.FromResult(0); } private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) @@ -291,7 +293,8 @@ namespace MQTTnet.Server if (subscribeResult.CloseConnection) { - await StopAsync(MqttClientDisconnectType.NotClean).ConfigureAwait(false); + Stop(MqttClientDisconnectType.NotClean); + return; } await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 9302f0a..98e5835 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -120,7 +120,7 @@ namespace MQTTnet.Server { foreach (var session in _sessions) { - await session.Value.StopAsync(MqttClientDisconnectType.NotClean).ConfigureAwait(false); + session.Value.Stop(MqttClientDisconnectType.NotClean); } _sessions.Clear(); @@ -232,7 +232,7 @@ namespace MQTTnet.Server { _sessions.Remove(connectPacket.ClientId); - await clientSession.StopAsync(MqttClientDisconnectType.Clean).ConfigureAwait(false); + clientSession.Stop(MqttClientDisconnectType.Clean); clientSession.Dispose(); clientSession = null; @@ -270,7 +270,7 @@ namespace MQTTnet.Server var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage); if (interceptorContext.CloseConnection) { - await senderClientSession.StopAsync().ConfigureAwait(false); + senderClientSession.Stop(MqttClientDisconnectType.NotClean); } if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs index 206942f..f3c4320 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs @@ -1,5 +1,4 @@ using System.Threading; -using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Diagnostics; using MQTTnet.Packets; @@ -18,7 +17,6 @@ namespace MQTTnet.Core.Tests var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate { timeoutCalledCount++; - return Task.FromResult(0); }, new MqttNetLogger()); Assert.AreEqual(0, timeoutCalledCount); @@ -40,7 +38,6 @@ namespace MQTTnet.Core.Tests var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate { timeoutCalledCount++; - return Task.FromResult(0); }, new MqttNetLogger()); Assert.AreEqual(0, timeoutCalledCount);