diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 2ebe29d..71a9d55 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -10,9 +10,14 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
- * Merged projects.
-* Added a strong name for the assembly.
-
+ * [Core] Merged projects (BREAKING CHANGE! But only namespace changes).
+* [Core] Added a strong name for the assembly.
+* [Core] Performance optimizations.
+* [Core] Fixed a logging issue when dealing with IOExceptions.
+* [Client] Fixed an issue in _ManagedClient_ which can cause the client to stop when publishing subscriptions.
+* [Server] The application message interceptor can now delete any received application message.
+* [Server] Added a ConnectionValidator context to align with other APIs.
+* [Server] Added an interface for the _MqttServerOptions_.
Copyright Christian Kratky 2016-2017
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
index 58ab452..bf89fab 100644
--- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
+++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
+using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -171,13 +172,25 @@ namespace MQTTnet.Adapter
}
catch (COMException comException)
{
- if ((uint)comException.HResult == ErrorOperationAborted)
+ if ((uint) comException.HResult == ErrorOperationAborted)
{
throw new OperationCanceledException();
}
throw new MqttCommunicationException(comException);
}
+ catch (IOException exception)
+ {
+ if (exception.InnerException is SocketException socketException)
+ {
+ if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
+ {
+ throw new OperationCanceledException();
+ }
+ }
+
+ throw new MqttCommunicationException(exception);
+ }
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
index 90db1dc..7ae7250 100644
--- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
@@ -142,33 +142,53 @@ namespace MQTTnet.ManagedClient
{
while (!cancellationToken.IsCancellationRequested)
{
- var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
- if (connectionState == ReconnectionResult.NotConnected)
- {
- _publishingCancellationToken?.Cancel(false);
- _publishingCancellationToken = null;
+ await TryMaintainConnectionAsync(cancellationToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception exception)
+ {
+ _logger.Error(exception, "Unhandled exception while maintaining connection.");
+ }
+ finally
+ {
+ await _mqttClient.DisconnectAsync().ConfigureAwait(false);
+ _logger.Info("Stopped");
+ }
+ }
- await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
- continue;
- }
+ private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
+ if (connectionState == ReconnectionResult.NotConnected)
+ {
+ _publishingCancellationToken?.Cancel(false);
+ _publishingCancellationToken = null;
- if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
- {
- await PushSubscriptionsAsync();
+ await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
+ {
+ await PushSubscriptionsAsync().ConfigureAwait(false);
- _publishingCancellationToken = new CancellationTokenSource();
+ _publishingCancellationToken = new CancellationTokenSource();
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
- Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token).ConfigureAwait(false);
+ Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
- continue;
- }
+ return;
+ }
- if (connectionState == ReconnectionResult.StillConnected)
- {
- await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false);
- }
+ if (connectionState == ReconnectionResult.StillConnected)
+ {
+ await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -182,11 +202,6 @@ namespace MQTTnet.ManagedClient
{
_logger.Error(exception, "Unhandled exception while maintaining connection.");
}
- finally
- {
- await _mqttClient.DisconnectAsync().ConfigureAwait(false);
- _logger.Info("Stopped");
- }
}
private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
@@ -206,16 +221,19 @@ namespace MQTTnet.ManagedClient
continue;
}
- await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
- await _storageManager.RemoveAsync(message).ConfigureAwait(false);
+ await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
}
+ catch (Exception exception)
+ {
+ _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
+ }
finally
{
- _logger.Info("Stopped publishing messages");
+ _logger.Trace("Stopped publishing messages.");
}
}
@@ -224,6 +242,7 @@ namespace MQTTnet.ManagedClient
try
{
await _mqttClient.PublishAsync(message).ConfigureAwait(false);
+ await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
diff --git a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs
index ea1c746..ee3a55f 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs
@@ -1,4 +1,5 @@
-using MQTTnet.Serializer;
+using System;
+using MQTTnet.Serializer;
namespace MQTTnet.Server
{
@@ -7,5 +8,9 @@ namespace MQTTnet.Server
public string ClientId { get; set; }
public MqttProtocolVersion ProtocolVersion { get; set; }
+
+ public TimeSpan LastPacketReceivedDuration { get; set; }
+
+ public TimeSpan LastNonKeepAlivePacketReceivedDuration{ get; set; }
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
index defe5eb..870e5e0 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
@@ -120,6 +120,7 @@ namespace MQTTnet.Server
await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
+ var now = DateTime.UtcNow;
return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
@@ -136,13 +137,21 @@ namespace MQTTnet.Server
{
try
{
- var interceptorContext = new MqttApplicationMessageInterceptorContext
+ if (_options.ApplicationMessageInterceptor != null)
{
- ApplicationMessage = applicationMessage
- };
+ var interceptorContext = new MqttApplicationMessageInterceptorContext
+ {
+ ApplicationMessage = applicationMessage
+ };
+
+ _options.ApplicationMessageInterceptor(interceptorContext);
+ applicationMessage = interceptorContext.ApplicationMessage;
+ }
- _options.ApplicationMessageInterceptor?.Invoke(interceptorContext);
- applicationMessage = interceptorContext.ApplicationMessage;
+ if (applicationMessage == null)
+ {
+ return;
+ }
if (applicationMessage.Retain)
{
diff --git a/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs b/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs
index 67eb69e..2713aa3 100644
--- a/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs
@@ -14,6 +14,17 @@ namespace MQTTnet.TestApp.NetCore
MqttNetGlobalLogger.LogMessagePublished += PrintToConsole;
}
+ public static void PrintToConsole(string message, ConsoleColor color)
+ {
+ lock (Lock)
+ {
+ var backupColor = Console.ForegroundColor;
+ Console.ForegroundColor = color;
+ Console.Write(message);
+ Console.ForegroundColor = backupColor;
+ }
+ }
+
private static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e)
{
var output = new StringBuilder();
@@ -23,28 +34,24 @@ namespace MQTTnet.TestApp.NetCore
output.AppendLine(e.TraceMessage.Exception.ToString());
}
- lock (Lock)
+ var color = ConsoleColor.Red;
+ switch (e.TraceMessage.Level)
{
- var backupColor = Console.ForegroundColor;
- switch (e.TraceMessage.Level)
- {
- case MqttNetLogLevel.Error:
- Console.ForegroundColor = ConsoleColor.Red;
- break;
- case MqttNetLogLevel.Warning:
- Console.ForegroundColor = ConsoleColor.Yellow;
- break;
- case MqttNetLogLevel.Info:
- Console.ForegroundColor = ConsoleColor.Green;
- break;
- case MqttNetLogLevel.Verbose:
- Console.ForegroundColor = ConsoleColor.Gray;
- break;
- }
-
- Console.Write(output);
- Console.ForegroundColor = backupColor;
+ case MqttNetLogLevel.Error:
+ color = ConsoleColor.Red;
+ break;
+ case MqttNetLogLevel.Warning:
+ color = ConsoleColor.Yellow;
+ break;
+ case MqttNetLogLevel.Info:
+ color = ConsoleColor.Green;
+ break;
+ case MqttNetLogLevel.Verbose:
+ color = ConsoleColor.Gray;
+ break;
}
+
+ PrintToConsole(output.ToString(), color);
}
}
}
\ No newline at end of file
diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
index d4960d0..8f89176 100644
--- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs
@@ -1,9 +1,9 @@
using System;
using System.Text;
using System.Threading.Tasks;
-using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
+using Newtonsoft.Json.Linq;
namespace MQTTnet.TestApp.NetCore
{
@@ -63,6 +63,25 @@ namespace MQTTnet.TestApp.NetCore
//options.TlsEndpointOptions.IsEnabled = false;
var mqttServer = new MqttFactory().CreateMqttServer();
+
+ mqttServer.ApplicationMessageReceived += (s, e) =>
+ {
+ MqttNetConsoleLogger.PrintToConsole(
+ $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}'",
+ ConsoleColor.Magenta);
+ };
+
+ options.ApplicationMessageInterceptor = c =>
+ {
+ var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
+ var timestampProperty = content.Property("timestamp");
+ if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
+ {
+ timestampProperty.Value = DateTime.Now.ToString("O");
+ c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
+ }
+ };
+
mqttServer.ClientDisconnected += (s, e) =>
{
Console.Write("Client disconnected event fired.");
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
index a750b27..00c6ffa 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
@@ -407,12 +407,14 @@ namespace MQTTnet.TestApp.UniversalWindows
{
// Configure MQTT server.
+ var optionsBuilder = new MqttServerOptionsBuilder()
+ .WithConnectionBacklog(100)
+ .WithDefaultEndpointPort(1884);
+
var options = new MqttServerOptions
{
- ConnectionBacklog = 100
};
- options.DefaultEndpointOptions.Port = 1884;
options.ConnectionValidator = c =>
{
if (c.ClientId != "Highlander")
@@ -425,7 +427,7 @@ namespace MQTTnet.TestApp.UniversalWindows
};
var mqttServer = new MqttFactory().CreateMqttServer();
- await mqttServer.StartAsync(options);
+ await mqttServer.StartAsync(optionsBuilder.Build());
}
{