diff --git a/Build/MQTTnet.Extensions.WebSocket4Net.nuspec b/Build/MQTTnet.Extensions.WebSocket4Net.nuspec new file mode 100644 index 0000000..054933e --- /dev/null +++ b/Build/MQTTnet.Extensions.WebSocket4Net.nuspec @@ -0,0 +1,42 @@ + + + + MQTTnet.Extensions.WebSocket4Net + 0.0.0 + Christian Kratky + Christian Kratky + LICENSE + https://github.com/chkr1011/MQTTnet + https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png + false + This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients. + For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/). + Copyright Christian Kratky 2016-2019 + MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 3a8585f..543719c 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -12,6 +12,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). * [Core] Fixed issues in MQTTv5 message encoding and decoding. +* [Core] Added extension method to allow usage of _WebSocket4Net_ in clients to fix issues with AWS and Xamarin. * [Client] Added support for extended authentication exchange. * [Client] Exposed MQTTv5 CONNACK values to client. * [Client] Added _MqttClientSubscribeOptionsBuilder_. diff --git a/Build/build.ps1 b/Build/build.ps1 index dd605dd..ff439f0 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -36,6 +36,14 @@ Write-Host &$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" &$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" +# Build the WebSocket4Net extension +&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" +&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" +&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" +&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" +&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + + # Build MQTTnet.Server Remove-Item ..\Source\MQTTnet.Server\bin\Release\netcoreapp2.2 -Recurse -Force -ErrorAction SilentlyContinue &$msbuild ..\Source\MQTTnet.Server\MQTTnet.Server.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.2" /p:publishprofile=FolderProfile /p:deployonbuild=true /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" @@ -55,6 +63,8 @@ Copy-Item MQTTnet.Extensions.Rpc.nuspec -Destination MQTTnet.Extensions.Rpc.nusp (Get-Content MQTTnet.Extensions.Rpc.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.Rpc.nuspec Copy-Item MQTTnet.Extensions.ManagedClient.nuspec -Destination MQTTnet.Extensions.ManagedClient.nuspec.old -Force (Get-Content MQTTnet.Extensions.ManagedClient.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.ManagedClient.nuspec +Copy-Item MQTTnet.Extensions.WebSocket4Net.nuspec -Destination MQTTnet.Extensions.WebSocket4Net.nuspec.old -Force +(Get-Content MQTTnet.Extensions.WebSocket4Net.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.WebSocket4Net.nuspec New-Item -ItemType Directory -Force -Path .\NuGet .\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion @@ -62,7 +72,9 @@ New-Item -ItemType Directory -Force -Path .\NuGet .\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion .\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion .\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion +.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion Move-Item MQTTnet.AspNetCore.nuspec.old -Destination MQTTnet.AspNetCore.nuspec -Force Move-Item MQTTnet.Extensions.Rpc.nuspec.old -Destination MQTTnet.Extensions.Rpc.nuspec -Force -Move-Item MQTTnet.Extensions.ManagedClient.nuspec.old -Destination MQTTnet.Extensions.ManagedClient.nuspec -Force \ No newline at end of file +Move-Item MQTTnet.Extensions.ManagedClient.nuspec.old -Destination MQTTnet.Extensions.ManagedClient.nuspec -Force +Move-Item MQTTnet.Extensions.WebSocket4Net.nuspec.old -Destination MQTTnet.Extensions.WebSocket4Net.nuspec -Force \ No newline at end of file diff --git a/MQTTnet.sln b/MQTTnet.sln index 28d5dca..4504baa 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 Build\MQTTnet.AspNetCore.nuspec = Build\MQTTnet.AspNetCore.nuspec Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec + Build\MQTTnet.Extensions.WebSocket4Net.nuspec = Build\MQTTnet.Extensions.WebSocket4Net.nuspec Build\MQTTnet.NETStandard.nuspec = Build\MQTTnet.NETStandard.nuspec Build\MQTTnet.nuspec = Build\MQTTnet.nuspec Build\upload.ps1 = Build\upload.ps1 @@ -50,6 +51,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Server", "Source\MQTTnet.Server\MQTTnet.Server.csproj", "{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.WebSocket4Net", "Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj", "{2BD01D53-4CA5-4142-BE8D-313876395E3E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -245,6 +248,22 @@ Global {5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x64.Build.0 = Release|Any CPU {5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x86.ActiveCfg = Release|Any CPU {5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x86.Build.0 = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|ARM.ActiveCfg = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|ARM.Build.0 = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x64.ActiveCfg = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x64.Build.0 = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x86.ActiveCfg = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x86.Build.0 = Debug|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|Any CPU.Build.0 = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|ARM.ActiveCfg = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|ARM.Build.0 = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x64.ActiveCfg = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x64.Build.0 = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x86.ActiveCfg = Release|Any CPU + {2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -261,6 +280,7 @@ Global {C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {2BD01D53-4CA5-4142-BE8D-313876395E3E} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/MQTTnet.Extensions.WebSocket4Net.csproj b/Source/MQTTnet.Extensions.WebSocket4Net/MQTTnet.Extensions.WebSocket4Net.csproj new file mode 100644 index 0000000..b2aade0 --- /dev/null +++ b/Source/MQTTnet.Extensions.WebSocket4Net/MQTTnet.Extensions.WebSocket4Net.csproj @@ -0,0 +1,34 @@ + + + + netstandard1.3;netstandard2.0 + $(TargetFrameworks);net452;net461 + $(TargetFrameworks);uap10.0 + + + + + + + + false + UAP,Version=v10.0 + UAP + 10.0.17134.0 + 10.0.10240.0 + .NETCore + v5.0 + $(DefineConstants);WINDOWS_UWP + en + $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets + + + + + + + + + + + diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs b/Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs new file mode 100644 index 0000000..e593a88 --- /dev/null +++ b/Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs @@ -0,0 +1,14 @@ +using System; + +namespace MQTTnet.Extensions.WebSocket4Net +{ + public static class MqttFactoryExtensions + { + public static IMqttFactory UseWebSocket4Net(this IMqttFactory mqttFactory) + { + if (mqttFactory == null) throw new ArgumentNullException(nameof(mqttFactory)); + + return mqttFactory.UseClientAdapterFactory(new WebSocket4NetMqttClientAdapterFactory()); + } + } +} diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs new file mode 100644 index 0000000..c3c6be2 --- /dev/null +++ b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs @@ -0,0 +1,231 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Channel; +using MQTTnet.Client.Options; +using MQTTnet.Exceptions; +using MQTTnet.Internal; +using SuperSocket.ClientEngine; +using WebSocket4Net; + +namespace MQTTnet.Extensions.WebSocket4Net +{ + public class WebSocket4NetMqttChannel : IMqttChannel + { + private readonly BlockingCollection _receiveBuffer = new BlockingCollection(); + + private readonly IMqttClientOptions _clientOptions; + private readonly MqttClientWebSocketOptions _webSocketOptions; + + private WebSocket _webSocket; + + public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions, MqttClientWebSocketOptions webSocketOptions) + { + _clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions)); + _webSocketOptions = webSocketOptions ?? throw new ArgumentNullException(nameof(webSocketOptions)); + } + + public string Endpoint => _webSocketOptions.Uri; + + public bool IsSecureConnection { get; private set; } + + public async Task ConnectAsync(CancellationToken cancellationToken) + { + var uri = _webSocketOptions.Uri; + if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) && !uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase)) + { + if (_webSocketOptions.TlsOptions?.UseTls == false) + { + uri = "ws://" + uri; + } + else + { + uri = "wss://" + uri; + } + } + + var sslProtocols = _webSocketOptions?.TlsOptions?.SslProtocol ?? SslProtocols.None; + var subProtocol = _webSocketOptions.SubProtocols.FirstOrDefault() ?? string.Empty; + + var cookies = new List>(); + if (_webSocketOptions.CookieContainer != null) + { + throw new NotSupportedException("Cookies are not supported."); + } + + List> customHeaders = null; + if (_webSocketOptions.RequestHeaders != null) + { + customHeaders = _webSocketOptions.RequestHeaders.Select(i => new KeyValuePair(i.Key, i.Value)).ToList(); + } + + EndPoint proxy = null; + if (_webSocketOptions.ProxyOptions != null) + { + throw new NotSupportedException("Proxies are not supported."); + } + + // The user agent can be empty always because it is just added to the custom headers as "User-Agent". + var userAgent = string.Empty; + + var origin = string.Empty; + var webSocketVersion = WebSocketVersion.None; + var receiveBufferSize = 0; + + var certificates = new X509CertificateCollection(); + if (_webSocketOptions?.TlsOptions?.Certificates != null) + { + foreach (var certificate in _webSocketOptions.TlsOptions.Certificates) + { + certificates.Add(new X509Certificate(certificate)); + } + } + + _webSocket = new WebSocket(uri, subProtocol, cookies, customHeaders, userAgent, origin, webSocketVersion, proxy, sslProtocols, receiveBufferSize) + { + NoDelay = true, + Security = + { + AllowUnstrustedCertificate = _webSocketOptions?.TlsOptions?.AllowUntrustedCertificates == true, + AllowCertificateChainErrors = _webSocketOptions?.TlsOptions?.IgnoreCertificateChainErrors == true, + Certificates = certificates + } + }; + + await ConnectInternalAsync(cancellationToken).ConfigureAwait(false); + + IsSecureConnection = uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase); + } + + public Task DisconnectAsync(CancellationToken cancellationToken) + { + if (_webSocket != null && _webSocket.State == WebSocketState.Open) + { + _webSocket.Close(); + } + + return Task.FromResult(0); + } + + public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var readBytes = 0; + while (count > 0 && !cancellationToken.IsCancellationRequested) + { + if (!_receiveBuffer.TryTake(out var @byte)) + { + if (readBytes == 0) + { + // Block until at least one byte was received. + @byte = _receiveBuffer.Take(cancellationToken); + } + else + { + return Task.FromResult(readBytes); + } + } + + buffer[offset] = @byte; + offset++; + count--; + readBytes++; + } + + return Task.FromResult(readBytes); + } + + public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + _webSocket.Send(buffer, offset, count); + return Task.FromResult(0); + } + + public void Dispose() + { + if (_webSocket == null) + { + return; + } + + _webSocket.DataReceived -= OnDataReceived; + _webSocket.Error -= OnError; + _webSocket.Dispose(); + _webSocket = null; + } + + private void OnError(object sender, ErrorEventArgs e) + { + System.Diagnostics.Debug.Write(e.Exception.ToString()); + } + + private void OnDataReceived(object sender, DataReceivedEventArgs e) + { + foreach (var @byte in e.Data) + { + _receiveBuffer.Add(@byte); + } + } + + private async Task ConnectInternalAsync(CancellationToken cancellationToken) + { + _webSocket.Error += OnError; + _webSocket.DataReceived += OnDataReceived; + + var taskCompletionSource = new TaskCompletionSource(); + + void ErrorHandler(object sender, ErrorEventArgs e) + { + taskCompletionSource.TrySetResult(e.Exception); + } + + void SuccessHandler(object sender, EventArgs e) + { + taskCompletionSource.TrySetResult(null); + } + + try + { + _webSocket.Opened += SuccessHandler; + _webSocket.Error += ErrorHandler; + + _webSocket.Open(); + + var exception = await MqttTaskTimeout.WaitAsync(c => + { + c.Register(() => taskCompletionSource.TrySetCanceled()); + return taskCompletionSource.Task; + }, _clientOptions.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + + if (exception != null) + { + if (exception is AuthenticationException authenticationException) + { + throw new MqttCommunicationException(authenticationException.InnerException); + } + + if (exception is OperationCanceledException) + { + throw new MqttCommunicationTimedOutException(); + } + + throw new MqttCommunicationException(exception); + } + } + catch (OperationCanceledException) + { + throw new MqttCommunicationTimedOutException(); + } + finally + { + _webSocket.Opened -= SuccessHandler; + _webSocket.Error -= ErrorHandler; + } + } + } +} diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs new file mode 100644 index 0000000..7625fc9 --- /dev/null +++ b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs @@ -0,0 +1,36 @@ +using System; +using MQTTnet.Adapter; +using MQTTnet.Client.Options; +using MQTTnet.Diagnostics; +using MQTTnet.Formatter; +using MQTTnet.Implementations; + +namespace MQTTnet.Extensions.WebSocket4Net +{ + public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory + { + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + switch (options.ChannelOptions) + { + case MqttClientTcpOptions _: + { + return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } + + case MqttClientWebSocketOptions webSocketOptions: + { + return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } + + default: + { + throw new NotSupportedException(); + } + } + } + } +} diff --git a/Source/MQTTnet/Client/IMqttClientFactory.cs b/Source/MQTTnet/Client/IMqttClientFactory.cs index 95d567e..bae75ac 100644 --- a/Source/MQTTnet/Client/IMqttClientFactory.cs +++ b/Source/MQTTnet/Client/IMqttClientFactory.cs @@ -5,6 +5,8 @@ namespace MQTTnet.Client { public interface IMqttClientFactory { + IMqttFactory UseClientAdapterFactory(IMqttClientAdapterFactory clientAdapterFactory); + IMqttClient CreateMqttClient(); IMqttClient CreateMqttClient(IMqttNetLogger logger); diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index ab6cc76..cb835f6 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -10,6 +10,8 @@ namespace MQTTnet { public class MqttFactory : IMqttFactory { + private IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory(); + public MqttFactory() : this(new MqttNetLogger()) { } @@ -21,6 +23,12 @@ namespace MQTTnet public IMqttNetLogger DefaultLogger { get; } + public IMqttFactory UseClientAdapterFactory(IMqttClientAdapterFactory clientAdapterFactory) + { + _clientAdapterFactory = clientAdapterFactory ?? throw new ArgumentNullException(nameof(clientAdapterFactory)); + return this; + } + public IMqttClient CreateMqttClient() { return CreateMqttClient(DefaultLogger); @@ -30,22 +38,22 @@ namespace MQTTnet { if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new MqttClient(new MqttClientAdapterFactory(), logger); + return new MqttClient(_clientAdapterFactory, logger); } - public IMqttClient CreateMqttClient(IMqttClientAdapterFactory adapterFactory) + public IMqttClient CreateMqttClient(IMqttClientAdapterFactory clientAdapterFactory) { - if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory)); + if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory)); - return new MqttClient(adapterFactory, DefaultLogger); + return new MqttClient(clientAdapterFactory, DefaultLogger); } - public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory adapterFactory) + public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactory) { if (logger == null) throw new ArgumentNullException(nameof(logger)); - if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory)); + if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory)); - return new MqttClient(adapterFactory, logger); + return new MqttClient(clientAdapterFactory, logger); } public IMqttServer CreateMqttServer() @@ -60,19 +68,19 @@ namespace MQTTnet return CreateMqttServer(new List { new MqttTcpServerAdapter(logger.CreateChildLogger()) }, logger); } - public IMqttServer CreateMqttServer(IEnumerable adapters, IMqttNetLogger logger) + public IMqttServer CreateMqttServer(IEnumerable serverAdapters, IMqttNetLogger logger) { - if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters)); if (logger == null) throw new ArgumentNullException(nameof(logger)); - return new MqttServer(adapters, logger.CreateChildLogger()); + return new MqttServer(serverAdapters, logger.CreateChildLogger()); } - public IMqttServer CreateMqttServer(IEnumerable adapters) + public IMqttServer CreateMqttServer(IEnumerable serverAdapters) { - if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters)); - return new MqttServer(adapters, DefaultLogger.CreateChildLogger()); + return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger()); } } } \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index 1900203..f3114e3 100644 --- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -17,6 +17,7 @@ + diff --git a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs index ff4e337..58c4ad1 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs @@ -1,10 +1,13 @@ using MQTTnet.Client; using System; using System.IO; +using System.Net; +using System.Security.Authentication; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; +using MQTTnet.Extensions.WebSocket4Net; using MQTTnet.Formatter; using MQTTnet.Protocol; using Newtonsoft.Json; @@ -77,6 +80,7 @@ namespace MQTTnet.TestApp.NetCore { Write("Testing '" + name + "'... ", ConsoleColor.Gray); var factory = new MqttFactory(); + //factory.UseWebSocket4Net(); var client = factory.CreateMqttClient(); var topic = Guid.NewGuid().ToString(); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index c13db93..243c9c5 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -135,6 +135,10 @@ {c444e9c8-95fa-430e-9126-274129de16cd} MQTTnet.Extensions.Rpc + + {2BD01D53-4CA5-4142-BE8D-313876395E3E} + MQTTnet.Extensions.WebSocket4Net + {3587e506-55a2-4eb3-99c7-dc01e42d25d2} MQTTnet diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index 210f059..11e68c6 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -63,6 +63,7 @@ TCP WS + WS (WebSocket4Net) Use TLS diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 226fcf2..899ee21 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.ObjectModel; -using System.Net.Security; -using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; using Windows.Security.Cryptography.Certificates; @@ -23,6 +21,7 @@ using MQTTnet.Server; using MQTTnet.Server.Status; using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs; using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs; +using MQTTnet.Extensions.WebSocket4Net; namespace MQTTnet.TestApp.UniversalWindows { @@ -83,6 +82,8 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Connect(object sender, RoutedEventArgs e) { + var mqttFactory = new MqttFactory(); + var tlsOptions = new MqttClientTlsOptions { UseTls = UseTls.IsChecked == true, @@ -116,6 +117,17 @@ namespace MQTTnet.TestApp.UniversalWindows }; } + if (UseWs4Net.IsChecked == true) + { + options.ChannelOptions = new MqttClientWebSocketOptions + { + Uri = Server.Text, + TlsOptions = tlsOptions + }; + + mqttFactory.UseWebSocket4Net(); + } + if (options.ChannelOptions == null) { throw new InvalidOperationException(); @@ -156,11 +168,9 @@ namespace MQTTnet.TestApp.UniversalWindows _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); } - var factory = new MqttFactory(); - if (UseManagedClient.IsChecked == true) { - _managedMqttClient = factory.CreateManagedMqttClient(); + _managedMqttClient = mqttFactory.CreateManagedMqttClient(); _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x)); _managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); @@ -172,7 +182,7 @@ namespace MQTTnet.TestApp.UniversalWindows } else { - _mqttClient = factory.CreateMqttClient(); + _mqttClient = mqttFactory.CreateMqttClient(); _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x)); _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x)); @@ -204,7 +214,7 @@ namespace MQTTnet.TestApp.UniversalWindows private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs) { - var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; + var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () => { @@ -524,6 +534,13 @@ namespace MQTTnet.TestApp.UniversalWindows Console.WriteLine(); }); + void Handler(MqttApplicationMessageReceivedEventArgs args) + { + //... + } + + client.UseApplicationMessageReceivedHandler(Handler); + // Subscribe after connect client.UseConnectedHandler(async e =>