diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 3f5cd0b..0c48e8e 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -2,7 +2,7 @@ MQTTnet.AspNetCore - 2.7.1 + 2.7.3 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,13 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is a support library to integrate MQTTnet into AspNetCore. - * Updated to MQTTnet 2.7.1. + * Updated to MQTTnet 2.7.3. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index d3f11d5..59cb06a 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.7.2 + 2.7.3 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,11 +10,12 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Client] Added the subprotocol "mqtt" as default for web socket based connections. -* [Client] Added a new client setting called "KeepAliveSendInterval". It allows configuring the effective interval for sending ping requests. -* [Client] The client will no longer send ping requests if other packets are sent within the configured interval. -* [Server] The server now generates a valid packet identifier when disaptching publish packets to clients. -* [Core] Add several new extension methods. + * [Core] Add several new extension methods. +* [Client] Fixed an issue in _ManagedMqttClientOptionsBuilder_ when using _WithClientOptions_ and an options builder. +* [Client] Added the "IsStarted" property for the managed client. +* [Client] Optimized stream buffer for UWP apps. +* [Client] Added the _BufferSize_ to the TCP options. +* [Core] Fixed some issues in stream and socket handling. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs index f44d424..ac96ce6 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs @@ -6,6 +6,8 @@ public int? Port { get; set; } + public int BufferSize { get; set; } = 20 * 4096; + public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions(); } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs index c65f789..fce1629 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs @@ -15,12 +15,19 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttChannel, IDisposable { + // ReSharper disable once MemberCanBePrivate.Global + // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global + public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. + + private readonly int _bufferSize = BufferSize; private readonly MqttClientTcpOptions _options; + private StreamSocket _socket; public MqttTcpChannel(MqttClientTcpOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); + _bufferSize = _options.BufferSize; } public MqttTcpChannel(StreamSocket socket) @@ -69,14 +76,56 @@ namespace MQTTnet.Implementations public void Dispose() { - _socket?.Dispose(); - _socket = null; + try + { + SendStream?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + SendStream = null; + } + + try + { + ReceiveStream?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + ReceiveStream = null; + } + + try + { + _socket?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + _socket = null; + } } private void CreateStreams() { - SendStream = _socket.OutputStream.AsStreamForWrite(); - ReceiveStream = _socket.InputStream.AsStreamForRead(); + SendStream = _socket.OutputStream.AsStreamForWrite(_bufferSize); + ReceiveStream = _socket.InputStream.AsStreamForRead(_bufferSize); } private static Certificate LoadCertificate(MqttClientTcpOptions options) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index e908265..e39b551 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -14,15 +14,19 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttChannel, IDisposable { - private readonly MqttClientTcpOptions _options; - //todo: this can be used with min dependency NetStandard1.6 #if NET452 || NET461 // ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. + + private readonly int _bufferSize = BufferSize; +#else + private readonly int _bufferSize = 0; #endif + private readonly MqttClientTcpOptions _options; + private Socket _socket; private SslStream _sslStream; @@ -32,6 +36,7 @@ namespace MQTTnet.Implementations public MqttTcpChannel(MqttClientTcpOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); + _bufferSize = options.BufferSize; } /// @@ -43,7 +48,7 @@ namespace MQTTnet.Implementations _socket = socket ?? throw new ArgumentNullException(nameof(socket)); _sslStream = sslStream; - CreateStreams(socket, sslStream); + CreateStreams(); } public Stream SendStream { get; private set; } @@ -71,7 +76,7 @@ namespace MQTTnet.Implementations await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); } - CreateStreams(_socket, _sslStream); + CreateStreams(); } public Task DisconnectAsync() @@ -82,11 +87,70 @@ namespace MQTTnet.Implementations public void Dispose() { - _socket?.Dispose(); - _socket = null; + var oneStreamIsUsed = SendStream != null && ReceiveStream != null && ReferenceEquals(SendStream, ReceiveStream); + + try + { + SendStream?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + SendStream = null; + } + + try + { + if (!oneStreamIsUsed) + { + ReceiveStream?.Dispose(); + } + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + ReceiveStream = null; + } + + try + { + _sslStream?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + _sslStream = null; + } - _sslStream?.Dispose(); - _sslStream = null; + try + { + _socket?.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (NullReferenceException) + { + } + finally + { + _socket = null; + } } private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) @@ -136,20 +200,27 @@ namespace MQTTnet.Implementations return certificates; } - private void CreateStreams(Socket socket, Stream sslStream) + private void CreateStreams() { - var stream = sslStream ?? new NetworkStream(socket); + Stream stream; + if (_sslStream != null) + { + stream = _sslStream; + } + else + { + stream = new NetworkStream(_socket, true); + } //todo: if branch can be used with min dependency NetStandard1.6 #if NET452 || NET461 - SendStream = new BufferedStream(stream, BufferSize); - ReceiveStream = new BufferedStream(stream, BufferSize); + SendStream = new BufferedStream(stream, _bufferSize); + ReceiveStream = new BufferedStream(stream, _bufferSize); #else SendStream = stream; ReceiveStream = stream; #endif } - } } #endif diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 77d99a8..0e85c6a 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -96,8 +96,17 @@ namespace MQTTnet.Implementations public void Dispose() { - _webSocket?.Dispose(); - _webSocket = null; + try + { + _webSocket?.Dispose(); + } + catch (ObjectDisposedException) + { + } + finally + { + _webSocket = null; + } } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs index 6f435d5..97cdeca 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs @@ -7,6 +7,7 @@ namespace MQTTnet.ManagedClient { public interface IManagedMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable { + bool IsStarted { get; } bool IsConnected { get; } event EventHandler Connected; diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index ffb8639..4570319 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -40,6 +40,7 @@ namespace MQTTnet.ManagedClient } public bool IsConnected => _mqttClient.IsConnected; + public bool IsStarted => _connectionCancellationToken != null; public event EventHandler Connected; public event EventHandler Disconnected; diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs index d75117a..84b3785 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs @@ -21,6 +21,14 @@ namespace MQTTnet.ManagedClient return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); } + public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic) + { + if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + if (topic == null) throw new ArgumentNullException(nameof(topic)); + + return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); + } + public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params string[] topicFilters) { if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs index 38aadcf..4d65bdb 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs @@ -47,7 +47,7 @@ namespace MQTTnet.ManagedClient { if (options == null) throw new ArgumentNullException(nameof(options)); - if (_clientOptionsBuilder != null) + if (_clientOptionsBuilder == null) { _clientOptionsBuilder = new MqttClientOptionsBuilder(); }