From ec41efd8606c0a89e5db997b5aeed21f5bc7b220 Mon Sep 17 00:00:00 2001 From: Christian Date: Wed, 11 Apr 2018 21:22:08 +0200 Subject: [PATCH] Fix client dead lock and protocol version issues. --- Build/MQTTnet.AspNetCore.nuspec | 6 +- Build/MQTTnet.nuspec | 14 +-- .../Adapter/MqttChannelAdapter.cs | 8 +- .../Client/IMqttClientAdapterFactory.cs | 2 +- .../Client/IMqttClientOptions.cs | 2 +- .../MQTTnet.NetStandard/Client/MqttClient.cs | 92 +++++++++---------- .../Diagnostics/IMqttNetLogger.cs | 2 +- .../Diagnostics/MqttNetLogger.cs | 2 +- .../MqttClientAdapterFactory.cs | 20 +++- .../Implementations/MqttWebSocketChannel.cs | 2 +- .../ManagedClient/ManagedMqttClient.cs | 2 +- ...qttApplicationMessageInterceptorContext.cs | 4 + .../Server/MqttClientDisconnectedEventArgs.cs | 5 +- .../Server/MqttClientKeepAliveMonitor.cs | 2 +- .../Server/MqttClientPendingMessagesQueue.cs | 4 +- .../Server/MqttClientSession.cs | 23 +++-- .../Server/MqttClientSessionsManager.cs | 40 ++++---- .../Server/MqttRetainedMessagesManager.cs | 2 +- .../MQTTnet.NetStandard/Server/MqttServer.cs | 8 +- .../TestMqttCommunicationAdapterFactory.cs | 2 +- Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 49 +++++----- .../MainPage.xaml.cs | 27 ++++++ 22 files changed, 191 insertions(+), 127 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 0c48e8e..664b09c 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -2,7 +2,7 @@ MQTTnet.AspNetCore - 2.7.3 + 2.7.4 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,13 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is a support library to integrate MQTTnet into AspNetCore. - * Updated to MQTTnet 2.7.3. + * Updated to MQTTnet 2.7.4. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 1a7661f..2b7bf1e 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,9 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - + * [Client] Fixed a deadlock while the client disconnects. +* [Client] Fixed broken support for protocol version 3.1.0. * [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services. * [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!). +* [Server] The server no longer sends the will message of a client if the disconnect was clean (via _Disconnect_ packet). +* [Server] The application message interceptor now allows closing the connection. +* [Server] Added a new flag for the _ClientDisconnected_ event which contains a value indicating whether the disconnect was clean (via _Disconnect_ packet). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin @@ -42,12 +46,6 @@ - - - - - - @@ -65,8 +63,6 @@ - - \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 34ec1e0..c2f9ba4 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -36,7 +36,7 @@ namespace MQTTnet.Adapter public Task ConnectAsync(TimeSpan timeout) { ThrowIfDisposed(); - _logger.Trace("Connecting [Timeout={0}]", timeout); + _logger.Verbose("Connecting [Timeout={0}]", timeout); return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout)); } @@ -44,7 +44,7 @@ namespace MQTTnet.Adapter public Task DisconnectAsync(TimeSpan timeout) { ThrowIfDisposed(); - _logger.Trace("Disconnecting [Timeout={0}]", timeout); + _logger.Verbose("Disconnecting [Timeout={0}]", timeout); return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); } @@ -70,7 +70,7 @@ namespace MQTTnet.Adapter continue; } - _logger.Trace("TX >>> {0} [Timeout={1}]", packet, timeout); + _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); var chunks = PacketSerializer.Serialize(packet); foreach (var chunk in chunks) @@ -135,7 +135,7 @@ namespace MQTTnet.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - _logger.Trace("RX <<< {0}", packet); + _logger.Verbose("RX <<< {0}", packet); } finally { diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs index 8b14481..c2a7e74 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs @@ -5,6 +5,6 @@ namespace MQTTnet.Client { public interface IMqttClientAdapterFactory { - IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger); + IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger); } } diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs index 2f5b504..647f688 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Client TimeSpan CommunicationTimeout { get; } TimeSpan KeepAlivePeriod { get; } - TimeSpan? KeepAliveSendInterval { get; set; } + TimeSpan? KeepAliveSendInterval { get; } MqttProtocolVersion ProtocolVersion { get; } diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index f112566..90790d5 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -57,16 +57,16 @@ namespace MQTTnet.Client _packetIdentifierProvider.Reset(); _packetDispatcher.Reset(); - _adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger); + _adapter = _adapterFactory.CreateClientAdapter(options, _logger); - _logger.Trace("Trying to connect with server."); + _logger.Verbose("Trying to connect with server."); await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); - _logger.Trace("Connection with server established."); + _logger.Verbose("Connection with server established."); await StartReceivingPacketsAsync().ConfigureAwait(false); var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); - _logger.Trace("MQTT connection with server established."); + _logger.Verbose("MQTT connection with server established."); _sendTracker.Restart(); @@ -77,12 +77,14 @@ namespace MQTTnet.Client IsConnected = true; Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); + + _logger.Info("Connected."); return new MqttClientConnectResult(connectResponse.IsSessionPresent); } catch (Exception exception) { _logger.Error(exception, "Error while connecting with server."); - await DisconnectInternalAsync(exception).ConfigureAwait(false); + await DisconnectInternalAsync(null, exception).ConfigureAwait(false); throw; } @@ -104,7 +106,7 @@ namespace MQTTnet.Client } finally { - await DisconnectInternalAsync(null).ConfigureAwait(false); + await DisconnectInternalAsync(null, null).ConfigureAwait(false); } } @@ -159,7 +161,7 @@ namespace MQTTnet.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false); + await SendAsync(qosGroup.Cast().ToArray()).ConfigureAwait(false); break; } case MqttQualityOfServiceLevel.AtLeastOnce: @@ -236,33 +238,48 @@ namespace MQTTnet.Client if (IsConnected) throw new MqttProtocolViolationException(message); } - private async Task DisconnectInternalAsync(Exception exception) + private async Task DisconnectInternalAsync(Task sender, Exception exception) { await _disconnectLock.WaitAsync(); - var clientWasConnected = IsConnected; try { - IsConnected = false; - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) { return; } _cancellationTokenSource.Cancel(false); + } + catch (Exception adapterException) + { + _logger.Warning(adapterException, "Error while disconnecting from adapter."); + } + finally + { + _disconnectLock.Release(); + } + + var clientWasConnected = IsConnected; + IsConnected = false; - if (_packetReceiverTask != null) + try + { + if (_packetReceiverTask != null && _packetReceiverTask != sender) { - Task.WaitAll(_packetReceiverTask); + _packetReceiverTask.Wait(); } - if (_keepAliveMessageSenderTask != null) + if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender) { - Task.WaitAll(_keepAliveMessageSenderTask); + _keepAliveMessageSenderTask.Wait(); } - await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); - _logger.Trace("Disconnected from adapter."); + if (_adapter != null) + { + await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); + } + + _logger.Verbose("Disconnected from adapter."); } catch (Exception adapterException) { @@ -272,12 +289,9 @@ namespace MQTTnet.Client { _adapter?.Dispose(); _adapter = null; - _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; - _disconnectLock.Release(); - _logger.Info("Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); } @@ -287,8 +301,6 @@ namespace MQTTnet.Client { try { - _logger.Info("Received <<< {0}", packet); - if (packet is MqttPublishPacket publishPacket) { await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false); @@ -395,7 +407,7 @@ namespace MQTTnet.Client private async Task SendKeepAliveMessagesAsync() { - _logger.Info("Start sending keep alive packets."); + _logger.Verbose("Start sending keep alive packets."); try { @@ -415,37 +427,31 @@ namespace MQTTnet.Client await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false); } } - catch (OperationCanceledException) - { - } catch (Exception exception) { - if (_cancellationTokenSource.Token.IsCancellationRequested) + if (exception is OperationCanceledException) { - return; } - - if (exception is MqttCommunicationException) + else if (exception is MqttCommunicationException) { _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets."); } else { - _logger.Warning(exception, "Unhandled exception while sending/receiving keep alive packets."); - + _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); } - - await DisconnectInternalAsync(exception).ConfigureAwait(false); + + await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); } finally { - _logger.Info("Stopped sending keep alive packets."); + _logger.Verbose("Stopped sending keep alive packets."); } } private async Task ReceivePacketsAsync() { - _logger.Info("Start receiving packets."); + _logger.Verbose("Start receiving packets."); try { @@ -463,31 +469,25 @@ namespace MQTTnet.Client StartProcessReceivedPacket(packet); } } - catch (OperationCanceledException) - { - } catch (Exception exception) { - if (_cancellationTokenSource.IsCancellationRequested) + if (exception is OperationCanceledException) { - return; } - - if (exception is MqttCommunicationException) + else if (exception is MqttCommunicationException) { _logger.Warning(exception, "MQTT communication exception while receiving packets."); } else { _logger.Error(exception, "Unhandled exception while receiving packets."); - } - await DisconnectInternalAsync(exception).ConfigureAwait(false); + await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); } finally { - _logger.Info("Stopped receiving packets."); + _logger.Verbose("Stopped receiving packets."); } } diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs index a65207b..6ed16de 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs @@ -6,7 +6,7 @@ namespace MQTTnet.Diagnostics { event EventHandler LogMessagePublished; - void Trace(string message, params object[] parameters); + void Verbose(string message, params object[] parameters); void Info(string message, params object[] parameters); diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs index 8ad1d4c..53598b2 100644 --- a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs +++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Diagnostics public event EventHandler LogMessagePublished; - public void Trace(string message, params object[] parameters) + public void Verbose(string message, params object[] parameters) { Publish(MqttNetLogLevel.Verbose, null, message, parameters); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs index f8f0e99..4692d7e 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs @@ -8,18 +8,28 @@ namespace MQTTnet.Implementations { public class MqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); - switch (options) + var serializer = new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }; + + switch (options.ChannelOptions) { case MqttClientTcpOptions tcpOptions: - return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger); + { + return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), serializer, logger); + } + case MqttClientWebSocketOptions webSocketOptions: - return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger); + { + return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), serializer, logger); + } + default: - throw new NotSupportedException(); + { + throw new NotSupportedException(); + } } } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 0e85c6a..057a07e 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -9,7 +9,7 @@ using MQTTnet.Client; namespace MQTTnet.Implementations { - public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable + public sealed class MqttWebSocketChannel : IMqttChannel { // ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index 4570319..f1ce45d 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -244,7 +244,7 @@ namespace MQTTnet.ManagedClient } finally { - _logger.Trace("Stopped publishing messages."); + _logger.Verbose("Stopped publishing messages."); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs index 8074d52..5612601 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs @@ -11,5 +11,9 @@ public string ClientId { get; } public MqttApplicationMessage ApplicationMessage { get; set; } + + public bool AcceptPublish { get; set; } = true; + + public bool CloseConnection { get; set; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs index 78372bc..6dd505e 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs @@ -4,11 +4,14 @@ namespace MQTTnet.Server { public class MqttClientDisconnectedEventArgs : EventArgs { - public MqttClientDisconnectedEventArgs(ConnectedMqttClient client) + public MqttClientDisconnectedEventArgs(ConnectedMqttClient client, bool wasCleanDisconnect) { Client = client ?? throw new ArgumentNullException(nameof(client)); + WasCleanDisconnect = wasCleanDisconnect; } public ConnectedMqttClient Client { get; } + + public bool WasCleanDisconnect { get; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs index 6be02f3..3f21536 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs @@ -81,7 +81,7 @@ namespace MQTTnet.Server } finally { - _logger.Trace("Client {0}: Stopped checking keep alive timeout.", _clientId); + _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index aba1446..9949387 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -57,7 +57,7 @@ namespace MQTTnet.Server _queue.Enqueue(packet); _queueWaitSemaphore.Release(); - _logger.Trace("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); + _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); } private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) @@ -96,7 +96,7 @@ namespace MQTTnet.Server await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false); - _logger.Trace("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); + _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index c44467f..8514da9 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -23,6 +23,7 @@ namespace MQTTnet.Server private IMqttChannelAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; + private bool _wasCleanDisconnect; public MqttClientSession( string clientId, @@ -55,7 +56,7 @@ namespace MQTTnet.Server public bool IsConnected => _adapter != null; - public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -64,6 +65,7 @@ namespace MQTTnet.Server { var cancellationTokenSource = new CancellationTokenSource(); + _wasCleanDisconnect = false; _willMessage = connectPacket.WillMessage; _adapter = adapter; _cancellationTokenSource = cancellationTokenSource; @@ -84,9 +86,11 @@ namespace MQTTnet.Server { _logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } + + return _wasCleanDisconnect; } - public async Task StopAsync() + public async Task StopAsync(bool wasCleanDisconnect = false) { try { @@ -95,6 +99,8 @@ namespace MQTTnet.Server return; } + _wasCleanDisconnect = wasCleanDisconnect; + _cancellationTokenSource?.Cancel(false); PendingMessagesQueue.WaitForCompletion(); @@ -110,9 +116,10 @@ namespace MQTTnet.Server finally { var willMessage = _willMessage; - if (willMessage != null) + _willMessage = null; // clear willmessage so it is send just once + + if (willMessage != null && !wasCleanDisconnect) { - _willMessage = null; // clear willmessage so it is send just once await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false); } } @@ -246,7 +253,12 @@ namespace MQTTnet.Server return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken); } - if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) + if (packet is MqttDisconnectPacket) + { + return StopAsync(true); + } + + if (packet is MqttConnectPacket) { return StopAsync(); } @@ -262,7 +274,6 @@ namespace MQTTnet.Server if (subscribeResult.CloseConnection) { - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttDisconnectPacket() }).ConfigureAwait(false); await StopAsync().ConfigureAwait(false); } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index 56dd2d2..8dd9d06 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Server } public Action ClientConnectedCallback { get; set; } - public Action ClientDisconnectedCallback { get; set; } + public Action ClientDisconnectedCallback { get; set; } public Action ClientSubscribedTopicCallback { get; set; } public Action ClientUnsubscribedTopicCallback { get; set; } public Action ApplicationMessageReceivedCallback { get; set; } @@ -37,7 +37,9 @@ namespace MQTTnet.Server public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; + var wasCleanDisconnect = false; MqttClientSession clientSession = null; + try { if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken) @@ -84,7 +86,7 @@ namespace MQTTnet.Server ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion }); - await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); + wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -110,7 +112,8 @@ namespace MQTTnet.Server ClientId = clientId, ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion, PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0 - }); + }, + wasCleanDisconnect); } } @@ -156,8 +159,13 @@ namespace MQTTnet.Server { try { - applicationMessage = InterceptApplicationMessage(senderClientSession, applicationMessage); - if (applicationMessage == null) + var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage); + if (interceptorContext.CloseConnection) + { + await senderClientSession.StopAsync().ConfigureAwait(false); + } + + if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) { return; } @@ -230,20 +238,20 @@ namespace MQTTnet.Server } } - private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) + private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) { - var interceptor = _options.ApplicationMessageInterceptor; - if (interceptor == null) - { - return applicationMessage; - } - var interceptorContext = new MqttApplicationMessageInterceptorContext( senderClientSession?.ClientId, applicationMessage); + var interceptor = _options.ApplicationMessageInterceptor; + if (interceptor == null) + { + return interceptorContext; + } + interceptor(interceptorContext); - return interceptorContext.ApplicationMessage; + return interceptorContext; } private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) @@ -279,11 +287,11 @@ namespace MQTTnet.Server clientSession.Dispose(); clientSession = null; - _logger.Trace("Stopped existing session of client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Stopped existing session of client '{0}'.", connectPacket.ClientId); } else { - _logger.Trace("Reusing existing session of client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -302,7 +310,7 @@ namespace MQTTnet.Server _sessions[connectPacket.ClientId] = clientSession; - _logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId); + _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index 11cac24..f998557 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -132,7 +132,7 @@ namespace MQTTnet.Server if (!saveIsRequired) { - _logger.Trace("Skipped saving retained messages because no changes were detected."); + _logger.Verbose("Skipped saving retained messages because no changes were detected."); } if (saveIsRequired && _options.Storage != null) diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs index de33800..6329442 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs @@ -138,10 +138,10 @@ namespace MQTTnet.Server ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); } - private void OnClientDisconnected(ConnectedMqttClient client) + private void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect) { - _logger.Info("Client '{0}': Disconnected.", client.ClientId); - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client)); + _logger.Info("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect); + ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect)); } private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) @@ -162,7 +162,7 @@ namespace MQTTnet.Server private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { eventArgs.SessionTask = Task.Run( - async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false), + () => _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), _cancellationTokenSource.Token); } } diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs index eb291f9..4fe3537 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index 0c66169..80a8e5a 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -3,7 +3,6 @@ using System.Text; using System.Threading.Tasks; using MQTTnet.Protocol; using MQTTnet.Server; -using Newtonsoft.Json.Linq; namespace MQTTnet.TestApp.NetCore { @@ -38,6 +37,12 @@ namespace MQTTnet.TestApp.NetCore // based payload with the timestamp is a suitable use case. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); } + + if (context.ApplicationMessage.Topic == "not_allowed_topic") + { + context.AcceptPublish = false; + context.CloseConnection = true; + } }, SubscriptionInterceptor = context => { @@ -72,27 +77,27 @@ namespace MQTTnet.TestApp.NetCore ConsoleColor.Magenta); }; - options.ApplicationMessageInterceptor = c => - { - if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0) - { - return; - } - - try - { - var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); - var timestampProperty = content.Property("timestamp"); - if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) - { - timestampProperty.Value = DateTime.Now.ToString("O"); - c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); - } - } - catch (Exception) - { - } - }; + //options.ApplicationMessageInterceptor = c => + //{ + // if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0) + // { + // return; + // } + + // try + // { + // var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); + // var timestampProperty = content.Property("timestamp"); + // if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) + // { + // timestampProperty.Value = DateTime.Now.ToString("O"); + // c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); + // } + // } + // catch (Exception) + // { + // } + //}; mqttServer.ClientDisconnected += (s, e) => { diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 3ed12bf..da0c2f3 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -13,6 +13,8 @@ using MQTTnet.Implementations; using MQTTnet.ManagedClient; using MQTTnet.Protocol; using MQTTnet.Server; +using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; +using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; namespace MQTTnet.TestApp.UniversalWindows { @@ -33,6 +35,11 @@ namespace MQTTnet.TestApp.UniversalWindows private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e) { _traceMessages.Enqueue(e.TraceMessage); + await UpdateLogAsync(); + } + + private async Task UpdateLogAsync() + { while (_traceMessages.Count > 100) { _traceMessages.TryDequeue(out _); @@ -113,11 +120,15 @@ namespace MQTTnet.TestApp.UniversalWindows { await _mqttClient.DisconnectAsync(); _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived; + _mqttClient.Connected -= OnConnected; + _mqttClient.Disconnected -= OnDisconnected; } var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + _mqttClient.Connected += OnConnected; + _mqttClient.Disconnected += OnDisconnected; await _mqttClient.ConnectAsync(options); } @@ -127,6 +138,22 @@ namespace MQTTnet.TestApp.UniversalWindows } } + private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e) + { + _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, + "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null)); + + Task.Run(UpdateLogAsync); + } + + private void OnConnected(object sender, MqttClientConnectedEventArgs e) + { + _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, + "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null)); + + Task.Run(UpdateLogAsync); + } + private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) { var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";