diff --git a/.gitignore b/.gitignore index 39550dc..4372f8e 100644 --- a/.gitignore +++ b/.gitignore @@ -197,7 +197,7 @@ ClientBin/ *.dbmdl *.dbproj.schemaview *.jfm -*.pfx +# *.pfx *.publishsettings orleans.codegen.cs diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 5cd5d55..45bf545 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -36,7 +36,7 @@ namespace MQTTnet.AspNetCore if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); var channel = new MqttWebSocketServerChannel(webSocket); - var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(channel); + var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerCommunicationAdapter(channel); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); ClientAccepted?.Invoke(this, eventArgs); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs similarity index 78% rename from Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttServerAdapter.cs rename to Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs index 7d9d25f..658cfe8 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.Uwp.cs @@ -1,5 +1,4 @@ -#if NET451 || NETSTANDARD1_3 -#else +#if WINDOWS_UWP using System; using System.Threading.Tasks; using MQTTnet.Core.Adapter; @@ -12,13 +11,13 @@ namespace MQTTnet.Implementations public class MqttServerAdapter : IMqttServerAdapter, IDisposable { private readonly ILogger _logger; - private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; + private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; private StreamSocketListener _defaultEndpointSocket; - public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) + public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory communicationAdapterFactory) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); + _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -32,7 +31,6 @@ namespace MQTTnet.Implementations if (options.DefaultEndpointOptions.IsEnabled) { _defaultEndpointSocket = new StreamSocketListener(); - _defaultEndpointSocket.Control.NoDelay = true; await _defaultEndpointSocket.BindServiceNameAsync(options.GetDefaultEndpointPort().ToString(), SocketProtectionLevel.PlainSocket); _defaultEndpointSocket.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync; } @@ -45,6 +43,11 @@ namespace MQTTnet.Implementations public Task StopAsync() { + if (_defaultEndpointSocket != null) + { + _defaultEndpointSocket.ConnectionReceived -= AcceptDefaultEndpointConnectionsAsync; + } + _defaultEndpointSocket?.Dispose(); _defaultEndpointSocket = null; @@ -60,9 +63,7 @@ namespace MQTTnet.Implementations { try { - args.Socket.Control.NoDelay = true; - - var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(args.Socket)); + var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(args.Socket)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/NetStandard/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs similarity index 90% rename from Frameworks/MQTTnet.NetStandard/Implementations/NetStandard/MqttServerAdapter.cs rename to Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 1a6e5b4..9d807b8 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/NetStandard/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -1,5 +1,4 @@ #if NET451 || NETSTANDARD1_3 - using System; using System.Net; using System.Net.Security; @@ -17,17 +16,17 @@ namespace MQTTnet.Implementations public class MqttServerAdapter : IMqttServerAdapter, IDisposable { private readonly ILogger _logger; - private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; + private readonly IMqttCommunicationAdapterFactory _communicationAdapterFactory; private CancellationTokenSource _cancellationTokenSource; private Socket _defaultEndpointSocket; private Socket _tlsEndpointSocket; private X509Certificate2 _tlsCertificate; - public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) + public MqttServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory communicationAdapterFactory) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); + _communicationAdapterFactory = communicationAdapterFactory ?? throw new ArgumentNullException(nameof(communicationAdapterFactory)); } public event EventHandler ClientAccepted; @@ -104,7 +103,7 @@ namespace MQTTnet.Implementations #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(clientSocket, null)); + var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, null)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) @@ -132,7 +131,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream)); + var clientAdapter = _communicationAdapterFactory.CreateServerCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream)); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs similarity index 99% rename from Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttTcpChannel.cs rename to Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs index 6e311d6..bd0614f 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/Uap/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs @@ -1,5 +1,4 @@ -#if NET451 || NETSTANDARD1_3 -#else +#if WINDOWS_UWP using System; using System.Collections.Generic; using System.IO; diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/NetStandard/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs similarity index 100% rename from Frameworks/MQTTnet.NetStandard/Implementations/NetStandard/MqttTcpChannel.cs rename to Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index c120775..1314a1c 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -63,7 +63,7 @@ namespace MQTTnet.Implementations } } - await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None); + await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false); RawReceiveStream = new WebSocketStream(_webSocket); } diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index 50cddde..432a371 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -45,20 +45,20 @@ namespace MQTTnet return _serviceProvider.GetRequiredService(); } - public IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options) + public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options) { var logger = _serviceProvider.GetRequiredService>(); - return new MqttChannelCommunicationAdapter(CreateMqttCommunicationChannel(options.ChannelOptions), CreateSerializer(options.ProtocolVersion), logger); + return new MqttChannelCommunicationAdapter(CreateCommunicationChannel(options.ChannelOptions), CreateSerializer(options.ProtocolVersion), logger); } - public IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel) + public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel) { var serializer = _serviceProvider.GetRequiredService(); var logger = _serviceProvider.GetRequiredService>(); return new MqttChannelCommunicationAdapter(channel, serializer, logger); } - public IMqttCommunicationChannel CreateMqttCommunicationChannel(IMqttClientChannelOptions options) + public IMqttCommunicationChannel CreateCommunicationChannel(IMqttClientChannelOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs index ffcfd2c..9609881 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapterFactory.cs @@ -5,8 +5,8 @@ namespace MQTTnet.Core.Adapter { public interface IMqttCommunicationAdapterFactory { - IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options); + IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options); - IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel); + IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index d16575b..a8e4eba 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client _latestPacketIdentifier = 0; _packetDispatcher.Reset(); - _adapter = _communicationAdapterFactory.CreateClientMqttCommunicationAdapter(options); + _adapter = _communicationAdapterFactory.CreateClientCommunicationAdapter(options); _scopeHandle = _logger.BeginScope(options.LogId ?? options.ClientId); _logger.LogTrace("Trying to connect with server."); diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 417c19a..ce80cd5 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; @@ -13,7 +14,7 @@ namespace MQTTnet.Core.Serializer public sealed class MqttPacketReader : BinaryReader { private readonly ReceivedMqttPacket _receivedMqttPacket; - + public MqttPacketReader(ReceivedMqttPacket receivedMqttPacket) : base(receivedMqttPacket.Body, Encoding.UTF8, true) { @@ -70,6 +71,8 @@ namespace MQTTnet.Core.Serializer var multiplier = 1; var value = 0; byte encodedByte; + + var readBytes = new List(); do { if (cancellationToken.IsCancellationRequested) @@ -77,14 +80,23 @@ namespace MQTTnet.Core.Serializer throw new TaskCanceledException(); } - encodedByte = (byte)stream.ReadByte(); - value += (encodedByte & 127) * multiplier; + var buffer = stream.ReadByte(); + readBytes.Add(buffer); + + ////if (buffer == -1) + ////{ + //// break; + ////} + + encodedByte = (byte)buffer; + value += (byte)(encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) { - throw new MqttProtocolViolationException("Remaining length is invalid."); + throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", readBytes)})."); } } while ((encodedByte & 128) != 0); + return value; } } diff --git a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs index 7e3a7dc..0e56411 100644 --- a/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs @@ -13,12 +13,12 @@ namespace MQTTnet.Core.Tests _adapter = adapter; } - public IMqttCommunicationAdapter CreateClientMqttCommunicationAdapter(IMqttClientOptions options) + public IMqttCommunicationAdapter CreateClientCommunicationAdapter(IMqttClientOptions options) { return _adapter; } - public IMqttCommunicationAdapter CreateServerMqttCommunicationAdapter(IMqttCommunicationChannel channel) + public IMqttCommunicationAdapter CreateServerCommunicationAdapter(IMqttCommunicationChannel channel) { return _adapter; } diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index 753bac9..096f72f 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -18,7 +18,7 @@ {A5A43C5B-DE2A-4C0C-9213-0A381AF9435A};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} true MQTTnet.TestApp.UniversalWindows_TemporaryKey.pfx - 13E377A693C923EE9E88EE84B32A4B9881657362 + 05F70CAF1426E296BE2F7396F0B654070D72B930 win10;win10-arm;win10-arm-aot;win10-x86;win10-x86-aot;win10-x64;win10-x64-aot diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows_TemporaryKey.pfx b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows_TemporaryKey.pfx new file mode 100644 index 0000000..1adc887 Binary files /dev/null and b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows_TemporaryKey.pfx differ diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index f0031b8..e2d58be 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -27,7 +27,9 @@ ClientId: - + Clean session: + + TCP WS diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 76c0d07..ab46740 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -81,6 +81,8 @@ namespace MQTTnet.TestApp.UniversalWindows Password = Password.Text }; + options.CleanSession = CleanSession.IsChecked == true; + try { if (_mqttClient != null) @@ -226,6 +228,7 @@ namespace MQTTnet.TestApp.UniversalWindows .WithTcpServer("broker.hivemq.com") .WithCredentials("bud", "%spencer%") .WithTls() + .WithCleanSession() .Build(); await client.ConnectAsync(options); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/Package.appxmanifest b/Tests/MQTTnet.TestApp.UniversalWindows/Package.appxmanifest index d6a7c4c..b677c2a 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/Package.appxmanifest +++ b/Tests/MQTTnet.TestApp.UniversalWindows/Package.appxmanifest @@ -1,10 +1,10 @@  - + MQTTnet.TestApp.UniversalWindows - chris + test Assets\StoreLogo.png