diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec
index 0c48e8e..664b09c 100644
--- a/Build/MQTTnet.AspNetCore.nuspec
+++ b/Build/MQTTnet.AspNetCore.nuspec
@@ -2,7 +2,7 @@
MQTTnet.AspNetCore
- 2.7.3
+ 2.7.4
Christian Kratky
Christian Kratky
https://github.com/chkr1011/MQTTnet/blob/master/LICENSE
@@ -10,13 +10,13 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
This is a support library to integrate MQTTnet into AspNetCore.
- * Updated to MQTTnet 2.7.3.
+ * Updated to MQTTnet 2.7.4.
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
-
+
diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 1a7661f..2b7bf1e 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -10,9 +10,13 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
-
+ * [Client] Fixed a deadlock while the client disconnects.
+* [Client] Fixed broken support for protocol version 3.1.0.
* [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services.
* [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!).
+* [Server] The server no longer sends the will message of a client if the disconnect was clean (via _Disconnect_ packet).
+* [Server] The application message interceptor now allows closing the connection.
+* [Server] Added a new flag for the _ClientDisconnected_ event which contains a value indicating whether the disconnect was clean (via _Disconnect_ packet).
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
@@ -42,12 +46,6 @@
-
-
-
-
-
-
@@ -65,8 +63,6 @@
-
-
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
index 34ec1e0..c2f9ba4 100644
--- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
+++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
@@ -36,7 +36,7 @@ namespace MQTTnet.Adapter
public Task ConnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
- _logger.Trace("Connecting [Timeout={0}]", timeout);
+ _logger.Verbose("Connecting [Timeout={0}]", timeout);
return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
}
@@ -44,7 +44,7 @@ namespace MQTTnet.Adapter
public Task DisconnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
- _logger.Trace("Disconnecting [Timeout={0}]", timeout);
+ _logger.Verbose("Disconnecting [Timeout={0}]", timeout);
return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
}
@@ -70,7 +70,7 @@ namespace MQTTnet.Adapter
continue;
}
- _logger.Trace("TX >>> {0} [Timeout={1}]", packet, timeout);
+ _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout);
var chunks = PacketSerializer.Serialize(packet);
foreach (var chunk in chunks)
@@ -135,7 +135,7 @@ namespace MQTTnet.Adapter
throw new MqttProtocolViolationException("Received malformed packet.");
}
- _logger.Trace("RX <<< {0}", packet);
+ _logger.Verbose("RX <<< {0}", packet);
}
finally
{
diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs
index 8b14481..c2a7e74 100644
--- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientAdapterFactory.cs
@@ -5,6 +5,6 @@ namespace MQTTnet.Client
{
public interface IMqttClientAdapterFactory
{
- IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger);
+ IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
index 2f5b504..647f688 100644
--- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
+++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
@@ -13,7 +13,7 @@ namespace MQTTnet.Client
TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; }
- TimeSpan? KeepAliveSendInterval { get; set; }
+ TimeSpan? KeepAliveSendInterval { get; }
MqttProtocolVersion ProtocolVersion { get; }
diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
index f112566..90790d5 100644
--- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
@@ -57,16 +57,16 @@ namespace MQTTnet.Client
_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();
- _adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
+ _adapter = _adapterFactory.CreateClientAdapter(options, _logger);
- _logger.Trace("Trying to connect with server.");
+ _logger.Verbose("Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
- _logger.Trace("Connection with server established.");
+ _logger.Verbose("Connection with server established.");
await StartReceivingPacketsAsync().ConfigureAwait(false);
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
- _logger.Trace("MQTT connection with server established.");
+ _logger.Verbose("MQTT connection with server established.");
_sendTracker.Restart();
@@ -77,12 +77,14 @@ namespace MQTTnet.Client
IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
+
+ _logger.Info("Connected.");
return new MqttClientConnectResult(connectResponse.IsSessionPresent);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while connecting with server.");
- await DisconnectInternalAsync(exception).ConfigureAwait(false);
+ await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
throw;
}
@@ -104,7 +106,7 @@ namespace MQTTnet.Client
}
finally
{
- await DisconnectInternalAsync(null).ConfigureAwait(false);
+ await DisconnectInternalAsync(null, null).ConfigureAwait(false);
}
}
@@ -159,7 +161,7 @@ namespace MQTTnet.Client
case MqttQualityOfServiceLevel.AtMostOnce:
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
- await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false);
+ await SendAsync(qosGroup.Cast().ToArray()).ConfigureAwait(false);
break;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
@@ -236,33 +238,48 @@ namespace MQTTnet.Client
if (IsConnected) throw new MqttProtocolViolationException(message);
}
- private async Task DisconnectInternalAsync(Exception exception)
+ private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
await _disconnectLock.WaitAsync();
- var clientWasConnected = IsConnected;
try
{
- IsConnected = false;
-
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
return;
}
_cancellationTokenSource.Cancel(false);
+ }
+ catch (Exception adapterException)
+ {
+ _logger.Warning(adapterException, "Error while disconnecting from adapter.");
+ }
+ finally
+ {
+ _disconnectLock.Release();
+ }
+
+ var clientWasConnected = IsConnected;
+ IsConnected = false;
- if (_packetReceiverTask != null)
+ try
+ {
+ if (_packetReceiverTask != null && _packetReceiverTask != sender)
{
- Task.WaitAll(_packetReceiverTask);
+ _packetReceiverTask.Wait();
}
- if (_keepAliveMessageSenderTask != null)
+ if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
{
- Task.WaitAll(_keepAliveMessageSenderTask);
+ _keepAliveMessageSenderTask.Wait();
}
- await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
- _logger.Trace("Disconnected from adapter.");
+ if (_adapter != null)
+ {
+ await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
+ }
+
+ _logger.Verbose("Disconnected from adapter.");
}
catch (Exception adapterException)
{
@@ -272,12 +289,9 @@ namespace MQTTnet.Client
{
_adapter?.Dispose();
_adapter = null;
-
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
- _disconnectLock.Release();
-
_logger.Info("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
@@ -287,8 +301,6 @@ namespace MQTTnet.Client
{
try
{
- _logger.Info("Received <<< {0}", packet);
-
if (packet is MqttPublishPacket publishPacket)
{
await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false);
@@ -395,7 +407,7 @@ namespace MQTTnet.Client
private async Task SendKeepAliveMessagesAsync()
{
- _logger.Info("Start sending keep alive packets.");
+ _logger.Verbose("Start sending keep alive packets.");
try
{
@@ -415,37 +427,31 @@ namespace MQTTnet.Client
await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
- catch (OperationCanceledException)
- {
- }
catch (Exception exception)
{
- if (_cancellationTokenSource.Token.IsCancellationRequested)
+ if (exception is OperationCanceledException)
{
- return;
}
-
- if (exception is MqttCommunicationException)
+ else if (exception is MqttCommunicationException)
{
_logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets.");
}
else
{
- _logger.Warning(exception, "Unhandled exception while sending/receiving keep alive packets.");
-
+ _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
}
-
- await DisconnectInternalAsync(exception).ConfigureAwait(false);
+
+ await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
finally
{
- _logger.Info("Stopped sending keep alive packets.");
+ _logger.Verbose("Stopped sending keep alive packets.");
}
}
private async Task ReceivePacketsAsync()
{
- _logger.Info("Start receiving packets.");
+ _logger.Verbose("Start receiving packets.");
try
{
@@ -463,31 +469,25 @@ namespace MQTTnet.Client
StartProcessReceivedPacket(packet);
}
}
- catch (OperationCanceledException)
- {
- }
catch (Exception exception)
{
- if (_cancellationTokenSource.IsCancellationRequested)
+ if (exception is OperationCanceledException)
{
- return;
}
-
- if (exception is MqttCommunicationException)
+ else if (exception is MqttCommunicationException)
{
_logger.Warning(exception, "MQTT communication exception while receiving packets.");
}
else
{
_logger.Error(exception, "Unhandled exception while receiving packets.");
-
}
- await DisconnectInternalAsync(exception).ConfigureAwait(false);
+ await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
}
finally
{
- _logger.Info("Stopped receiving packets.");
+ _logger.Verbose("Stopped receiving packets.");
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
index a65207b..6ed16de 100644
--- a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
+++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
@@ -6,7 +6,7 @@ namespace MQTTnet.Diagnostics
{
event EventHandler LogMessagePublished;
- void Trace(string message, params object[] parameters);
+ void Verbose(string message, params object[] parameters);
void Info(string message, params object[] parameters);
diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
index 8ad1d4c..53598b2 100644
--- a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
+++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
@@ -13,7 +13,7 @@ namespace MQTTnet.Diagnostics
public event EventHandler LogMessagePublished;
- public void Trace(string message, params object[] parameters)
+ public void Verbose(string message, params object[] parameters)
{
Publish(MqttNetLogLevel.Verbose, null, message, parameters);
}
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs
index f8f0e99..4692d7e 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttClientAdapterFactory.cs
@@ -8,18 +8,28 @@ namespace MQTTnet.Implementations
{
public class MqttClientAdapterFactory : IMqttClientAdapterFactory
{
- public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
+ public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
- switch (options)
+ var serializer = new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion };
+
+ switch (options.ChannelOptions)
{
case MqttClientTcpOptions tcpOptions:
- return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger);
+ {
+ return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), serializer, logger);
+ }
+
case MqttClientWebSocketOptions webSocketOptions:
- return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger);
+ {
+ return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), serializer, logger);
+ }
+
default:
- throw new NotSupportedException();
+ {
+ throw new NotSupportedException();
+ }
}
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
index 0e85c6a..057a07e 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
@@ -9,7 +9,7 @@ using MQTTnet.Client;
namespace MQTTnet.Implementations
{
- public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable
+ public sealed class MqttWebSocketChannel : IMqttChannel
{
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
index 4570319..f1ce45d 100644
--- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
@@ -244,7 +244,7 @@ namespace MQTTnet.ManagedClient
}
finally
{
- _logger.Trace("Stopped publishing messages.");
+ _logger.Verbose("Stopped publishing messages.");
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs
index 8074d52..5612601 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttApplicationMessageInterceptorContext.cs
@@ -11,5 +11,9 @@
public string ClientId { get; }
public MqttApplicationMessage ApplicationMessage { get; set; }
+
+ public bool AcceptPublish { get; set; } = true;
+
+ public bool CloseConnection { get; set; }
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs
index 78372bc..6dd505e 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientDisconnectedEventArgs.cs
@@ -4,11 +4,14 @@ namespace MQTTnet.Server
{
public class MqttClientDisconnectedEventArgs : EventArgs
{
- public MqttClientDisconnectedEventArgs(ConnectedMqttClient client)
+ public MqttClientDisconnectedEventArgs(ConnectedMqttClient client, bool wasCleanDisconnect)
{
Client = client ?? throw new ArgumentNullException(nameof(client));
+ WasCleanDisconnect = wasCleanDisconnect;
}
public ConnectedMqttClient Client { get; }
+
+ public bool WasCleanDisconnect { get; }
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
index 6be02f3..3f21536 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
@@ -81,7 +81,7 @@ namespace MQTTnet.Server
}
finally
{
- _logger.Trace("Client {0}: Stopped checking keep alive timeout.", _clientId);
+ _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
index aba1446..9949387 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
@@ -57,7 +57,7 @@ namespace MQTTnet.Server
_queue.Enqueue(packet);
_queueWaitSemaphore.Release();
- _logger.Trace("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
}
private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
@@ -96,7 +96,7 @@ namespace MQTTnet.Server
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);
- _logger.Trace("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
}
catch (Exception exception)
{
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
index c44467f..8514da9 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
@@ -23,6 +23,7 @@ namespace MQTTnet.Server
private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
+ private bool _wasCleanDisconnect;
public MqttClientSession(
string clientId,
@@ -55,7 +56,7 @@ namespace MQTTnet.Server
public bool IsConnected => _adapter != null;
- public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
+ public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
@@ -64,6 +65,7 @@ namespace MQTTnet.Server
{
var cancellationTokenSource = new CancellationTokenSource();
+ _wasCleanDisconnect = false;
_willMessage = connectPacket.WillMessage;
_adapter = adapter;
_cancellationTokenSource = cancellationTokenSource;
@@ -84,9 +86,11 @@ namespace MQTTnet.Server
{
_logger.Error(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
}
+
+ return _wasCleanDisconnect;
}
- public async Task StopAsync()
+ public async Task StopAsync(bool wasCleanDisconnect = false)
{
try
{
@@ -95,6 +99,8 @@ namespace MQTTnet.Server
return;
}
+ _wasCleanDisconnect = wasCleanDisconnect;
+
_cancellationTokenSource?.Cancel(false);
PendingMessagesQueue.WaitForCompletion();
@@ -110,9 +116,10 @@ namespace MQTTnet.Server
finally
{
var willMessage = _willMessage;
- if (willMessage != null)
+ _willMessage = null; // clear willmessage so it is send just once
+
+ if (willMessage != null && !wasCleanDisconnect)
{
- _willMessage = null; // clear willmessage so it is send just once
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false);
}
}
@@ -246,7 +253,12 @@ namespace MQTTnet.Server
return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken);
}
- if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
+ if (packet is MqttDisconnectPacket)
+ {
+ return StopAsync(true);
+ }
+
+ if (packet is MqttConnectPacket)
{
return StopAsync();
}
@@ -262,7 +274,6 @@ namespace MQTTnet.Server
if (subscribeResult.CloseConnection)
{
- await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttDisconnectPacket() }).ConfigureAwait(false);
await StopAsync().ConfigureAwait(false);
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
index 56dd2d2..8dd9d06 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
@@ -29,7 +29,7 @@ namespace MQTTnet.Server
}
public Action ClientConnectedCallback { get; set; }
- public Action ClientDisconnectedCallback { get; set; }
+ public Action ClientDisconnectedCallback { get; set; }
public Action ClientSubscribedTopicCallback { get; set; }
public Action ClientUnsubscribedTopicCallback { get; set; }
public Action ApplicationMessageReceivedCallback { get; set; }
@@ -37,7 +37,9 @@ namespace MQTTnet.Server
public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
+ var wasCleanDisconnect = false;
MqttClientSession clientSession = null;
+
try
{
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
@@ -84,7 +86,7 @@ namespace MQTTnet.Server
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
});
- await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
+ wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -110,7 +112,8 @@ namespace MQTTnet.Server
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0
- });
+ },
+ wasCleanDisconnect);
}
}
@@ -156,8 +159,13 @@ namespace MQTTnet.Server
{
try
{
- applicationMessage = InterceptApplicationMessage(senderClientSession, applicationMessage);
- if (applicationMessage == null)
+ var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
+ if (interceptorContext.CloseConnection)
+ {
+ await senderClientSession.StopAsync().ConfigureAwait(false);
+ }
+
+ if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{
return;
}
@@ -230,20 +238,20 @@ namespace MQTTnet.Server
}
}
- private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
+ private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
- var interceptor = _options.ApplicationMessageInterceptor;
- if (interceptor == null)
- {
- return applicationMessage;
- }
-
var interceptorContext = new MqttApplicationMessageInterceptorContext(
senderClientSession?.ClientId,
applicationMessage);
+ var interceptor = _options.ApplicationMessageInterceptor;
+ if (interceptor == null)
+ {
+ return interceptorContext;
+ }
+
interceptor(interceptorContext);
- return interceptorContext.ApplicationMessage;
+ return interceptorContext;
}
private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
@@ -279,11 +287,11 @@ namespace MQTTnet.Server
clientSession.Dispose();
clientSession = null;
- _logger.Trace("Stopped existing session of client '{0}'.", connectPacket.ClientId);
+ _logger.Verbose("Stopped existing session of client '{0}'.", connectPacket.ClientId);
}
else
{
- _logger.Trace("Reusing existing session of client '{0}'.", connectPacket.ClientId);
+ _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId);
}
}
@@ -302,7 +310,7 @@ namespace MQTTnet.Server
_sessions[connectPacket.ClientId] = clientSession;
- _logger.Trace("Created a new session for client '{0}'.", connectPacket.ClientId);
+ _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
}
return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
index 11cac24..f998557 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
@@ -132,7 +132,7 @@ namespace MQTTnet.Server
if (!saveIsRequired)
{
- _logger.Trace("Skipped saving retained messages because no changes were detected.");
+ _logger.Verbose("Skipped saving retained messages because no changes were detected.");
}
if (saveIsRequired && _options.Storage != null)
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
index de33800..6329442 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
@@ -138,10 +138,10 @@ namespace MQTTnet.Server
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client));
}
- private void OnClientDisconnected(ConnectedMqttClient client)
+ private void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect)
{
- _logger.Info("Client '{0}': Disconnected.", client.ClientId);
- ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client));
+ _logger.Info("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect);
+ ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect));
}
private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
@@ -162,7 +162,7 @@ namespace MQTTnet.Server
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
eventArgs.SessionTask = Task.Run(
- async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false),
+ () => _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token),
_cancellationTokenSource.Token);
}
}
diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs
index eb291f9..4fe3537 100644
--- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs
+++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapterFactory.cs
@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests
_adapter = adapter;
}
- public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger)
+ public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger)
{
return _adapter;
}
diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
index 0c66169..80a8e5a 100644
--- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
@@ -3,7 +3,6 @@ using System.Text;
using System.Threading.Tasks;
using MQTTnet.Protocol;
using MQTTnet.Server;
-using Newtonsoft.Json.Linq;
namespace MQTTnet.TestApp.NetCore
{
@@ -38,6 +37,12 @@ namespace MQTTnet.TestApp.NetCore
// based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
+
+ if (context.ApplicationMessage.Topic == "not_allowed_topic")
+ {
+ context.AcceptPublish = false;
+ context.CloseConnection = true;
+ }
},
SubscriptionInterceptor = context =>
{
@@ -72,27 +77,27 @@ namespace MQTTnet.TestApp.NetCore
ConsoleColor.Magenta);
};
- options.ApplicationMessageInterceptor = c =>
- {
- if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
- {
- return;
- }
-
- try
- {
- var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
- var timestampProperty = content.Property("timestamp");
- if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
- {
- timestampProperty.Value = DateTime.Now.ToString("O");
- c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
- }
- }
- catch (Exception)
- {
- }
- };
+ //options.ApplicationMessageInterceptor = c =>
+ //{
+ // if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
+ // {
+ // return;
+ // }
+
+ // try
+ // {
+ // var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
+ // var timestampProperty = content.Property("timestamp");
+ // if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
+ // {
+ // timestampProperty.Value = DateTime.Now.ToString("O");
+ // c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
+ // }
+ // }
+ // catch (Exception)
+ // {
+ // }
+ //};
mqttServer.ClientDisconnected += (s, e) =>
{
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
index 3ed12bf..da0c2f3 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
@@ -13,6 +13,8 @@ using MQTTnet.Implementations;
using MQTTnet.ManagedClient;
using MQTTnet.Protocol;
using MQTTnet.Server;
+using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
+using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
namespace MQTTnet.TestApp.UniversalWindows
{
@@ -33,6 +35,11 @@ namespace MQTTnet.TestApp.UniversalWindows
private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
{
_traceMessages.Enqueue(e.TraceMessage);
+ await UpdateLogAsync();
+ }
+
+ private async Task UpdateLogAsync()
+ {
while (_traceMessages.Count > 100)
{
_traceMessages.TryDequeue(out _);
@@ -113,11 +120,15 @@ namespace MQTTnet.TestApp.UniversalWindows
{
await _mqttClient.DisconnectAsync();
_mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
+ _mqttClient.Connected -= OnConnected;
+ _mqttClient.Disconnected -= OnDisconnected;
}
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
+ _mqttClient.Connected += OnConnected;
+ _mqttClient.Disconnected += OnDisconnected;
await _mqttClient.ConnectAsync(options);
}
@@ -127,6 +138,22 @@ namespace MQTTnet.TestApp.UniversalWindows
}
}
+ private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
+ {
+ _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
+ "", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));
+
+ Task.Run(UpdateLogAsync);
+ }
+
+ private void OnConnected(object sender, MqttClientConnectedEventArgs e)
+ {
+ _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,
+ "", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));
+
+ Task.Run(UpdateLogAsync);
+ }
+
private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
{
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";