From 69781d822a73fd921869ffce337438be62ce728a Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 26 Nov 2017 22:57:56 +0100 Subject: [PATCH] Update docs, fix managed client connection issue --- Build/MQTTnet.nuspec | 11 ++- .../Adapter/MqttChannelAdapter.cs | 15 +++- .../ManagedClient/ManagedMqttClient.cs | 73 ++++++++++++------- .../Server/ConnectedMqttClient.cs | 7 +- .../Server/MqttClientSessionsManager.cs | 19 +++-- .../MqttNetConsoleLogger.cs | 47 +++++++----- Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 21 +++++- .../MainPage.xaml.cs | 8 +- 8 files changed, 140 insertions(+), 61 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 2ebe29d..71a9d55 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,9 +10,14 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * Merged projects. -* Added a strong name for the assembly. - + * [Core] Merged projects (BREAKING CHANGE! But only namespace changes). +* [Core] Added a strong name for the assembly. +* [Core] Performance optimizations. +* [Core] Fixed a logging issue when dealing with IOExceptions. +* [Client] Fixed an issue in _ManagedClient_ which can cause the client to stop when publishing subscriptions. +* [Server] The application message interceptor can now delete any received application message. +* [Server] Added a ConnectionValidator context to align with other APIs. +* [Server] Added an interface for the _MqttServerOptions_. Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 58ab452..bf89fab 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Net.Sockets; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -171,13 +172,25 @@ namespace MQTTnet.Adapter } catch (COMException comException) { - if ((uint)comException.HResult == ErrorOperationAborted) + if ((uint) comException.HResult == ErrorOperationAborted) { throw new OperationCanceledException(); } throw new MqttCommunicationException(comException); } + catch (IOException exception) + { + if (exception.InnerException is SocketException socketException) + { + if (socketException.SocketErrorCode == SocketError.ConnectionAborted) + { + throw new OperationCanceledException(); + } + } + + throw new MqttCommunicationException(exception); + } catch (Exception exception) { throw new MqttCommunicationException(exception); diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index 90db1dc..7ae7250 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -142,33 +142,53 @@ namespace MQTTnet.ManagedClient { while (!cancellationToken.IsCancellationRequested) { - var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false); - if (connectionState == ReconnectionResult.NotConnected) - { - _publishingCancellationToken?.Cancel(false); - _publishingCancellationToken = null; + await TryMaintainConnectionAsync(cancellationToken); + } + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while maintaining connection."); + } + finally + { + await _mqttClient.DisconnectAsync().ConfigureAwait(false); + _logger.Info("Stopped"); + } + } - await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); - continue; - } + private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken) + { + try + { + var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false); + if (connectionState == ReconnectionResult.NotConnected) + { + _publishingCancellationToken?.Cancel(false); + _publishingCancellationToken = null; - if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) - { - await PushSubscriptionsAsync(); + await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); + return; + } + + if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) + { + await PushSubscriptionsAsync().ConfigureAwait(false); - _publishingCancellationToken = new CancellationTokenSource(); + _publishingCancellationToken = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token).ConfigureAwait(false); + Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - continue; - } + return; + } - if (connectionState == ReconnectionResult.StillConnected) - { - await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false); - } + if (connectionState == ReconnectionResult.StillConnected) + { + await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -182,11 +202,6 @@ namespace MQTTnet.ManagedClient { _logger.Error(exception, "Unhandled exception while maintaining connection."); } - finally - { - await _mqttClient.DisconnectAsync().ConfigureAwait(false); - _logger.Info("Stopped"); - } } private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) @@ -206,16 +221,19 @@ namespace MQTTnet.ManagedClient continue; } - await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) { } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while publishing queued application messages."); + } finally { - _logger.Info("Stopped publishing messages"); + _logger.Trace("Stopped publishing messages."); } } @@ -224,6 +242,7 @@ namespace MQTTnet.ManagedClient try { await _mqttClient.PublishAsync(message).ConfigureAwait(false); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } catch (MqttCommunicationException exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs index ea1c746..ee3a55f 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs @@ -1,4 +1,5 @@ -using MQTTnet.Serializer; +using System; +using MQTTnet.Serializer; namespace MQTTnet.Server { @@ -7,5 +8,9 @@ namespace MQTTnet.Server public string ClientId { get; set; } public MqttProtocolVersion ProtocolVersion { get; set; } + + public TimeSpan LastPacketReceivedDuration { get; set; } + + public TimeSpan LastNonKeepAlivePacketReceivedDuration{ get; set; } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index defe5eb..870e5e0 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -120,6 +120,7 @@ namespace MQTTnet.Server await _sessionsSemaphore.WaitAsync().ConfigureAwait(false); try { + var now = DateTime.UtcNow; return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient { ClientId = s.Value.ClientId, @@ -136,13 +137,21 @@ namespace MQTTnet.Server { try { - var interceptorContext = new MqttApplicationMessageInterceptorContext + if (_options.ApplicationMessageInterceptor != null) { - ApplicationMessage = applicationMessage - }; + var interceptorContext = new MqttApplicationMessageInterceptorContext + { + ApplicationMessage = applicationMessage + }; + + _options.ApplicationMessageInterceptor(interceptorContext); + applicationMessage = interceptorContext.ApplicationMessage; + } - _options.ApplicationMessageInterceptor?.Invoke(interceptorContext); - applicationMessage = interceptorContext.ApplicationMessage; + if (applicationMessage == null) + { + return; + } if (applicationMessage.Retain) { diff --git a/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs b/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs index 67eb69e..2713aa3 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs +++ b/Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs @@ -14,6 +14,17 @@ namespace MQTTnet.TestApp.NetCore MqttNetGlobalLogger.LogMessagePublished += PrintToConsole; } + public static void PrintToConsole(string message, ConsoleColor color) + { + lock (Lock) + { + var backupColor = Console.ForegroundColor; + Console.ForegroundColor = color; + Console.Write(message); + Console.ForegroundColor = backupColor; + } + } + private static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e) { var output = new StringBuilder(); @@ -23,28 +34,24 @@ namespace MQTTnet.TestApp.NetCore output.AppendLine(e.TraceMessage.Exception.ToString()); } - lock (Lock) + var color = ConsoleColor.Red; + switch (e.TraceMessage.Level) { - var backupColor = Console.ForegroundColor; - switch (e.TraceMessage.Level) - { - case MqttNetLogLevel.Error: - Console.ForegroundColor = ConsoleColor.Red; - break; - case MqttNetLogLevel.Warning: - Console.ForegroundColor = ConsoleColor.Yellow; - break; - case MqttNetLogLevel.Info: - Console.ForegroundColor = ConsoleColor.Green; - break; - case MqttNetLogLevel.Verbose: - Console.ForegroundColor = ConsoleColor.Gray; - break; - } - - Console.Write(output); - Console.ForegroundColor = backupColor; + case MqttNetLogLevel.Error: + color = ConsoleColor.Red; + break; + case MqttNetLogLevel.Warning: + color = ConsoleColor.Yellow; + break; + case MqttNetLogLevel.Info: + color = ConsoleColor.Green; + break; + case MqttNetLogLevel.Verbose: + color = ConsoleColor.Gray; + break; } + + PrintToConsole(output.ToString(), color); } } } \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index d4960d0..8f89176 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -1,9 +1,9 @@ using System; using System.Text; using System.Threading.Tasks; -using MQTTnet.Diagnostics; using MQTTnet.Protocol; using MQTTnet.Server; +using Newtonsoft.Json.Linq; namespace MQTTnet.TestApp.NetCore { @@ -63,6 +63,25 @@ namespace MQTTnet.TestApp.NetCore //options.TlsEndpointOptions.IsEnabled = false; var mqttServer = new MqttFactory().CreateMqttServer(); + + mqttServer.ApplicationMessageReceived += (s, e) => + { + MqttNetConsoleLogger.PrintToConsole( + $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}'", + ConsoleColor.Magenta); + }; + + options.ApplicationMessageInterceptor = c => + { + var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); + var timestampProperty = content.Property("timestamp"); + if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) + { + timestampProperty.Value = DateTime.Now.ToString("O"); + c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); + } + }; + mqttServer.ClientDisconnected += (s, e) => { Console.Write("Client disconnected event fired."); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index a750b27..00c6ffa 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -407,12 +407,14 @@ namespace MQTTnet.TestApp.UniversalWindows { // Configure MQTT server. + var optionsBuilder = new MqttServerOptionsBuilder() + .WithConnectionBacklog(100) + .WithDefaultEndpointPort(1884); + var options = new MqttServerOptions { - ConnectionBacklog = 100 }; - options.DefaultEndpointOptions.Port = 1884; options.ConnectionValidator = c => { if (c.ClientId != "Highlander") @@ -425,7 +427,7 @@ namespace MQTTnet.TestApp.UniversalWindows }; var mqttServer = new MqttFactory().CreateMqttServer(); - await mqttServer.StartAsync(options); + await mqttServer.StartAsync(optionsBuilder.Build()); } {