Browse Source

Add extension library for WebSocket4Net usage.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
d1a17314ef
15 changed files with 448 additions and 21 deletions
  1. +42
    -0
      Build/MQTTnet.Extensions.WebSocket4Net.nuspec
  2. +1
    -0
      Build/MQTTnet.nuspec
  3. +13
    -1
      Build/build.ps1
  4. +20
    -0
      MQTTnet.sln
  5. +34
    -0
      Source/MQTTnet.Extensions.WebSocket4Net/MQTTnet.Extensions.WebSocket4Net.csproj
  6. +14
    -0
      Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs
  7. +231
    -0
      Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs
  8. +36
    -0
      Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs
  9. +2
    -0
      Source/MQTTnet/Client/IMqttClientFactory.cs
  10. +21
    -13
      Source/MQTTnet/MqttFactory.cs
  11. +1
    -0
      Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
  12. +4
    -0
      Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
  13. +4
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  14. +1
    -0
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  15. +24
    -7
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 42
- 0
Build/MQTTnet.Extensions.WebSocket4Net.nuspec View File

@@ -0,0 +1,42 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>MQTTnet.Extensions.WebSocket4Net</id>
<version>0.0.0</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<license type="file">LICENSE</license>
<projectUrl>https://github.com/chkr1011/MQTTnet</projectUrl>
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows using _WebSocket4Net_ as transport for MQTTnet clients.</description>
<releaseNotes>For release notes please go to MQTTnet release notes (https://www.nuget.org/packages/MQTTnet/).</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="$nugetVersion" />
</dependencies>
</metadata>

<files>
<!-- License -->
<file src="..\LICENSE" />

<!-- .NET Standard 1.3 -->
<file src="..\Source\MQTTnet.Extensions.WebSocket4Net\bin\Release\netstandard1.3\MQTTnet.Extensions.WebSocket4Net.*" target="lib\netstandard1.3\"/>

<!-- .NET Standard 2.0 -->
<file src="..\Source\MQTTnet.Extensions.WebSocket4Net\bin\Release\netstandard2.0\MQTTnet.Extensions.WebSocket4Net.*" target="lib\netstandard2.0\"/>

<!-- Universal Windows -->
<file src="..\Source\MQTTnet.Extensions.WebSocket4Net\bin\Release\uap10.0\MQTTnet.Extensions.WebSocket4Net.*" target="lib\uap10.0\"/>

<!-- .NET Framework -->
<file src="..\Source\MQTTnet.Extensions.WebSocket4Net\bin\Release\net452\MQTTnet.Extensions.WebSocket4Net.*" target="lib\net452\"/>

<!-- .NET Framework 4.6.0 will use binaries from 4.5.2. -->
<file src="..\Source\MQTTnet.Extensions.WebSocket4Net\bin\Release\net461\MQTTnet.Extensions.WebSocket4Net.*" target="lib\net461\"/>

<!-- Everything above .NET Framework 4.6.1 will use the binaries from 4.6.1. -->
</files>
</package>

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -12,6 +12,7 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>
* [Core] Fixed issues in MQTTv5 message encoding and decoding.
* [Core] Added extension method to allow usage of _WebSocket4Net_ in clients to fix issues with AWS and Xamarin.
* [Client] Added support for extended authentication exchange.
* [Client] Exposed MQTTv5 CONNACK values to client.
* [Client] Added _MqttClientSubscribeOptionsBuilder_.


+ 13
- 1
Build/build.ps1 View File

@@ -36,6 +36,14 @@ Write-Host
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"

# Build the WebSocket4Net extension
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"


# Build MQTTnet.Server
Remove-Item ..\Source\MQTTnet.Server\bin\Release\netcoreapp2.2 -Recurse -Force -ErrorAction SilentlyContinue
&$msbuild ..\Source\MQTTnet.Server\MQTTnet.Server.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp2.2" /p:publishprofile=FolderProfile /p:deployonbuild=true /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"
@@ -55,6 +63,8 @@ Copy-Item MQTTnet.Extensions.Rpc.nuspec -Destination MQTTnet.Extensions.Rpc.nusp
(Get-Content MQTTnet.Extensions.Rpc.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.Rpc.nuspec
Copy-Item MQTTnet.Extensions.ManagedClient.nuspec -Destination MQTTnet.Extensions.ManagedClient.nuspec.old -Force
(Get-Content MQTTnet.Extensions.ManagedClient.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.ManagedClient.nuspec
Copy-Item MQTTnet.Extensions.WebSocket4Net.nuspec -Destination MQTTnet.Extensions.WebSocket4Net.nuspec.old -Force
(Get-Content MQTTnet.Extensions.WebSocket4Net.nuspec) -replace '\$nugetVersion', $nugetVersion | Set-Content MQTTnet.Extensions.WebSocket4Net.nuspec

New-Item -ItemType Directory -Force -Path .\NuGet
.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
@@ -62,7 +72,9 @@ New-Item -ItemType Directory -Force -Path .\NuGet
.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion
.\nuget.exe pack MQTTnet.Extensions.WebSocket4Net.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion

Move-Item MQTTnet.AspNetCore.nuspec.old -Destination MQTTnet.AspNetCore.nuspec -Force
Move-Item MQTTnet.Extensions.Rpc.nuspec.old -Destination MQTTnet.Extensions.Rpc.nuspec -Force
Move-Item MQTTnet.Extensions.ManagedClient.nuspec.old -Destination MQTTnet.Extensions.ManagedClient.nuspec -Force
Move-Item MQTTnet.Extensions.ManagedClient.nuspec.old -Destination MQTTnet.Extensions.ManagedClient.nuspec -Force
Move-Item MQTTnet.Extensions.WebSocket4Net.nuspec.old -Destination MQTTnet.Extensions.WebSocket4Net.nuspec -Force

+ 20
- 0
MQTTnet.sln View File

@@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1
Build\MQTTnet.AspNetCore.nuspec = Build\MQTTnet.AspNetCore.nuspec
Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec
Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec
Build\MQTTnet.Extensions.WebSocket4Net.nuspec = Build\MQTTnet.Extensions.WebSocket4Net.nuspec
Build\MQTTnet.NETStandard.nuspec = Build\MQTTnet.NETStandard.nuspec
Build\MQTTnet.nuspec = Build\MQTTnet.nuspec
Build\upload.ps1 = Build\upload.ps1
@@ -50,6 +51,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Server", "Source\MQTTnet.Server\MQTTnet.Server.csproj", "{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.WebSocket4Net", "Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj", "{2BD01D53-4CA5-4142-BE8D-313876395E3E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -245,6 +248,22 @@ Global
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x64.Build.0 = Release|Any CPU
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x86.ActiveCfg = Release|Any CPU
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}.Release|x86.Build.0 = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|ARM.ActiveCfg = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|ARM.Build.0 = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x64.ActiveCfg = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x64.Build.0 = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x86.ActiveCfg = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Debug|x86.Build.0 = Debug|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|Any CPU.Build.0 = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|ARM.ActiveCfg = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|ARM.Build.0 = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x64.ActiveCfg = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x64.Build.0 = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x86.ActiveCfg = Release|Any CPU
{2BD01D53-4CA5-4142-BE8D-313876395E3E}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -261,6 +280,7 @@ Global
{C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{5699FB8C-838C-4AB0-80A5-9CA809F9B65B} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{2BD01D53-4CA5-4142-BE8D-313876395E3E} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}


+ 34
- 0
Source/MQTTnet.Extensions.WebSocket4Net/MQTTnet.Extensions.WebSocket4Net.csproj View File

@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard1.3;netstandard2.0</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' AND '$(MSBuildRuntimeType)' != 'Core'">$(TargetFrameworks);uap10.0</TargetFrameworks>
<Product />
<Company />
<Authors />
<PackageId />
</PropertyGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0'">
<CopyLocalLockFileAssemblies>false</CopyLocalLockFileAssemblies>
<NugetTargetMoniker>UAP,Version=v10.0</NugetTargetMoniker>
<TargetPlatformIdentifier>UAP</TargetPlatformIdentifier>
<TargetPlatformVersion>10.0.17134.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.10240.0</TargetPlatformMinVersion>
<TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier>
<TargetFrameworkVersion>v5.0</TargetFrameworkVersion>
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants>
<DefaultLanguage>en</DefaultLanguage>
<LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="WebSocket4Net" Version="0.15.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\MQTTnet\MQTTnet.csproj" />
</ItemGroup>

</Project>

+ 14
- 0
Source/MQTTnet.Extensions.WebSocket4Net/MqttFactoryExtensions.cs View File

@@ -0,0 +1,14 @@
using System;

namespace MQTTnet.Extensions.WebSocket4Net
{
public static class MqttFactoryExtensions
{
public static IMqttFactory UseWebSocket4Net(this IMqttFactory mqttFactory)
{
if (mqttFactory == null) throw new ArgumentNullException(nameof(mqttFactory));

return mqttFactory.UseClientAdapterFactory(new WebSocket4NetMqttClientAdapterFactory());
}
}
}

+ 231
- 0
Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs View File

@@ -0,0 +1,231 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
using SuperSocket.ClientEngine;
using WebSocket4Net;

namespace MQTTnet.Extensions.WebSocket4Net
{
public class WebSocket4NetMqttChannel : IMqttChannel
{
private readonly BlockingCollection<byte> _receiveBuffer = new BlockingCollection<byte>();

private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientWebSocketOptions _webSocketOptions;

private WebSocket _webSocket;

public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions, MqttClientWebSocketOptions webSocketOptions)
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
_webSocketOptions = webSocketOptions ?? throw new ArgumentNullException(nameof(webSocketOptions));
}

public string Endpoint => _webSocketOptions.Uri;

public bool IsSecureConnection { get; private set; }

public async Task ConnectAsync(CancellationToken cancellationToken)
{
var uri = _webSocketOptions.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) && !uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
{
if (_webSocketOptions.TlsOptions?.UseTls == false)
{
uri = "ws://" + uri;
}
else
{
uri = "wss://" + uri;
}
}

var sslProtocols = _webSocketOptions?.TlsOptions?.SslProtocol ?? SslProtocols.None;
var subProtocol = _webSocketOptions.SubProtocols.FirstOrDefault() ?? string.Empty;

var cookies = new List<KeyValuePair<string, string>>();
if (_webSocketOptions.CookieContainer != null)
{
throw new NotSupportedException("Cookies are not supported.");
}

List<KeyValuePair<string, string>> customHeaders = null;
if (_webSocketOptions.RequestHeaders != null)
{
customHeaders = _webSocketOptions.RequestHeaders.Select(i => new KeyValuePair<string, string>(i.Key, i.Value)).ToList();
}

EndPoint proxy = null;
if (_webSocketOptions.ProxyOptions != null)
{
throw new NotSupportedException("Proxies are not supported.");
}

// The user agent can be empty always because it is just added to the custom headers as "User-Agent".
var userAgent = string.Empty;

var origin = string.Empty;
var webSocketVersion = WebSocketVersion.None;
var receiveBufferSize = 0;

var certificates = new X509CertificateCollection();
if (_webSocketOptions?.TlsOptions?.Certificates != null)
{
foreach (var certificate in _webSocketOptions.TlsOptions.Certificates)
{
certificates.Add(new X509Certificate(certificate));
}
}

_webSocket = new WebSocket(uri, subProtocol, cookies, customHeaders, userAgent, origin, webSocketVersion, proxy, sslProtocols, receiveBufferSize)
{
NoDelay = true,
Security =
{
AllowUnstrustedCertificate = _webSocketOptions?.TlsOptions?.AllowUntrustedCertificates == true,
AllowCertificateChainErrors = _webSocketOptions?.TlsOptions?.IgnoreCertificateChainErrors == true,
Certificates = certificates
}
};

await ConnectInternalAsync(cancellationToken).ConfigureAwait(false);
IsSecureConnection = uri.StartsWith("wss://", StringComparison.OrdinalIgnoreCase);
}

public Task DisconnectAsync(CancellationToken cancellationToken)
{
if (_webSocket != null && _webSocket.State == WebSocketState.Open)
{
_webSocket.Close();
}
return Task.FromResult(0);
}

public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var readBytes = 0;
while (count > 0 && !cancellationToken.IsCancellationRequested)
{
if (!_receiveBuffer.TryTake(out var @byte))
{
if (readBytes == 0)
{
// Block until at least one byte was received.
@byte = _receiveBuffer.Take(cancellationToken);
}
else
{
return Task.FromResult(readBytes);
}
}

buffer[offset] = @byte;
offset++;
count--;
readBytes++;
}

return Task.FromResult(readBytes);
}

public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
_webSocket.Send(buffer, offset, count);
return Task.FromResult(0);
}

public void Dispose()
{
if (_webSocket == null)
{
return;
}

_webSocket.DataReceived -= OnDataReceived;
_webSocket.Error -= OnError;
_webSocket.Dispose();
_webSocket = null;
}

private void OnError(object sender, ErrorEventArgs e)
{
System.Diagnostics.Debug.Write(e.Exception.ToString());
}

private void OnDataReceived(object sender, DataReceivedEventArgs e)
{
foreach (var @byte in e.Data)
{
_receiveBuffer.Add(@byte);
}
}

private async Task ConnectInternalAsync(CancellationToken cancellationToken)
{
_webSocket.Error += OnError;
_webSocket.DataReceived += OnDataReceived;

var taskCompletionSource = new TaskCompletionSource<Exception>();

void ErrorHandler(object sender, ErrorEventArgs e)
{
taskCompletionSource.TrySetResult(e.Exception);
}

void SuccessHandler(object sender, EventArgs e)
{
taskCompletionSource.TrySetResult(null);
}

try
{
_webSocket.Opened += SuccessHandler;
_webSocket.Error += ErrorHandler;

_webSocket.Open();

var exception = await MqttTaskTimeout.WaitAsync(c =>
{
c.Register(() => taskCompletionSource.TrySetCanceled());
return taskCompletionSource.Task;
}, _clientOptions.CommunicationTimeout, cancellationToken).ConfigureAwait(false);

if (exception != null)
{
if (exception is AuthenticationException authenticationException)
{
throw new MqttCommunicationException(authenticationException.InnerException);
}

if (exception is OperationCanceledException)
{
throw new MqttCommunicationTimedOutException();
}

throw new MqttCommunicationException(exception);
}
}
catch (OperationCanceledException)
{
throw new MqttCommunicationTimedOutException();
}
finally
{
_webSocket.Opened -= SuccessHandler;
_webSocket.Error -= ErrorHandler;
}
}
}
}

+ 36
- 0
Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs View File

@@ -0,0 +1,36 @@
using System;
using MQTTnet.Adapter;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;

namespace MQTTnet.Extensions.WebSocket4Net
{
public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (logger == null) throw new ArgumentNullException(nameof(logger));

switch (options.ChannelOptions)
{
case MqttClientTcpOptions _:
{
return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}

case MqttClientWebSocketOptions webSocketOptions:
{
return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger);
}

default:
{
throw new NotSupportedException();
}
}
}
}
}

+ 2
- 0
Source/MQTTnet/Client/IMqttClientFactory.cs View File

@@ -5,6 +5,8 @@ namespace MQTTnet.Client
{
public interface IMqttClientFactory
{
IMqttFactory UseClientAdapterFactory(IMqttClientAdapterFactory clientAdapterFactory);

IMqttClient CreateMqttClient();

IMqttClient CreateMqttClient(IMqttNetLogger logger);


+ 21
- 13
Source/MQTTnet/MqttFactory.cs View File

@@ -10,6 +10,8 @@ namespace MQTTnet
{
public class MqttFactory : IMqttFactory
{
private IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory();

public MqttFactory() : this(new MqttNetLogger())
{
}
@@ -21,6 +23,12 @@ namespace MQTTnet

public IMqttNetLogger DefaultLogger { get; }

public IMqttFactory UseClientAdapterFactory(IMqttClientAdapterFactory clientAdapterFactory)
{
_clientAdapterFactory = clientAdapterFactory ?? throw new ArgumentNullException(nameof(clientAdapterFactory));
return this;
}

public IMqttClient CreateMqttClient()
{
return CreateMqttClient(DefaultLogger);
@@ -30,22 +38,22 @@ namespace MQTTnet
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new MqttClient(new MqttClientAdapterFactory(), logger);
return new MqttClient(_clientAdapterFactory, logger);
}

public IMqttClient CreateMqttClient(IMqttClientAdapterFactory adapterFactory)
public IMqttClient CreateMqttClient(IMqttClientAdapterFactory clientAdapterFactory)
{
if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory));
if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory));

return new MqttClient(adapterFactory, DefaultLogger);
return new MqttClient(clientAdapterFactory, DefaultLogger);
}

public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory adapterFactory)
public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactory)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory));
if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory));

return new MqttClient(adapterFactory, logger);
return new MqttClient(clientAdapterFactory, logger);
}

public IMqttServer CreateMqttServer()
@@ -60,19 +68,19 @@ namespace MQTTnet
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger.CreateChildLogger()) }, logger);
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters, IMqttNetLogger logger)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new MqttServer(adapters, logger.CreateChildLogger());
return new MqttServer(serverAdapters, logger.CreateChildLogger());
}

public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters)
public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> serverAdapters)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters));
return new MqttServer(adapters, DefaultLogger.CreateChildLogger());
return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger());
}
}
}

+ 1
- 0
Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj View File

@@ -17,6 +17,7 @@

<ItemGroup>
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" />
</ItemGroup>



+ 4
- 0
Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs View File

@@ -1,10 +1,13 @@
using MQTTnet.Client;
using System;
using System.IO;
using System.Net;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.WebSocket4Net;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using Newtonsoft.Json;
@@ -77,6 +80,7 @@ namespace MQTTnet.TestApp.NetCore
{
Write("Testing '" + name + "'... ", ConsoleColor.Gray);
var factory = new MqttFactory();
//factory.UseWebSocket4Net();
var client = factory.CreateMqttClient();
var topic = Guid.NewGuid().ToString();



+ 4
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj View File

@@ -135,6 +135,10 @@
<Project>{c444e9c8-95fa-430e-9126-274129de16cd}</Project>
<Name>MQTTnet.Extensions.Rpc</Name>
</ProjectReference>
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj">
<Project>{2BD01D53-4CA5-4142-BE8D-313876395E3E}</Project>
<Name>MQTTnet.Extensions.WebSocket4Net</Name>
</ProjectReference>
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj">
<Project>{3587e506-55a2-4eb3-99c7-dc01e42d25d2}</Project>
<Name>MQTTnet</Name>


+ 1
- 0
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml View File

@@ -63,6 +63,7 @@
<StackPanel Grid.Column="1" Grid.Row="6" Orientation="Horizontal">
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="transport">TCP</RadioButton>
<RadioButton x:Name="UseWs" GroupName="transport">WS</RadioButton>
<RadioButton x:Name="UseWs4Net" GroupName="transport">WS (WebSocket4Net)</RadioButton>
<CheckBox x:Name="UseTls">Use TLS</CheckBox>
</StackPanel>



+ 24
- 7
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
@@ -23,6 +21,7 @@ using MQTTnet.Server;
using MQTTnet.Server.Status;
using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
using MQTTnet.Extensions.WebSocket4Net;

namespace MQTTnet.TestApp.UniversalWindows
{
@@ -83,6 +82,8 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Connect(object sender, RoutedEventArgs e)
{
var mqttFactory = new MqttFactory();

var tlsOptions = new MqttClientTlsOptions
{
UseTls = UseTls.IsChecked == true,
@@ -116,6 +117,17 @@ namespace MQTTnet.TestApp.UniversalWindows
};
}

if (UseWs4Net.IsChecked == true)
{
options.ChannelOptions = new MqttClientWebSocketOptions
{
Uri = Server.Text,
TlsOptions = tlsOptions
};

mqttFactory.UseWebSocket4Net();
}

if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
@@ -156,11 +168,9 @@ namespace MQTTnet.TestApp.UniversalWindows
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
}

var factory = new MqttFactory();

if (UseManagedClient.IsChecked == true)
{
_managedMqttClient = factory.CreateManagedMqttClient();
_managedMqttClient = mqttFactory.CreateManagedMqttClient();
_managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
@@ -172,7 +182,7 @@ namespace MQTTnet.TestApp.UniversalWindows
}
else
{
_mqttClient = factory.CreateMqttClient();
_mqttClient = mqttFactory.CreateMqttClient();
_mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
@@ -204,7 +214,7 @@ namespace MQTTnet.TestApp.UniversalWindows

private async Task HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {eventArgs.ApplicationMessage.ConvertPayloadToString()} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";

await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
{
@@ -524,6 +534,13 @@ namespace MQTTnet.TestApp.UniversalWindows
Console.WriteLine();
});

void Handler(MqttApplicationMessageReceivedEventArgs args)
{
//...
}
client.UseApplicationMessageReceivedHandler(Handler);

// Subscribe after connect

client.UseConnectedHandler(async e =>


Loading…
Cancel
Save