Browse Source

Refactor adapter handling.

release/3.x.x
Christian Kratky 4 years ago
parent
commit
b9e9fe64ac
7 changed files with 113 additions and 101 deletions
  1. +4
    -1
      Build/build.ps1
  2. +41
    -34
      Source/MQTTnet/Client/MqttClient.cs
  3. +6
    -6
      Source/MQTTnet/Client/MqttClientExtensions.cs
  4. +7
    -7
      Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs
  5. +1
    -1
      Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs
  6. +9
    -9
      Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs
  7. +45
    -43
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 4
- 1
Build/build.ps1 View File

@@ -12,6 +12,10 @@ Write-Host "Nuget version = $nugetVersion"
Write-Host "MSBuild path = $msbuild" Write-Host "MSBuild path = $msbuild"
Write-Host Write-Host


Invoke-WebRequest -Uri "https://dist.nuget.org/win-x86-commandline/latest/nuget.exe" -OutFile "nuget.exe"

.\nuget.exe restore ..\MQTTnet.sln

# Build and execute tests # Build and execute tests
&$msbuild ..\Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp3.1" /verbosity:m &$msbuild ..\Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp3.1" /verbosity:m
&$msbuild ..\Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp3.1" /verbosity:m &$msbuild ..\Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netcoreapp3.1" /verbosity:m
@@ -51,7 +55,6 @@ vstest.console.exe ..\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp3.1\M
&$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" &$msbuild ..\Source\MQTTnet.Extensions.WebSocket4Net\MQTTnet.Extensions.WebSocket4Net.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx"


# Create NuGet packages. # Create NuGet packages.
Invoke-WebRequest -Uri "https://dist.nuget.org/win-x86-commandline/latest/nuget.exe" -OutFile "nuget.exe"


Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue




+ 41
- 34
Source/MQTTnet/Client/MqttClient.cs View File

@@ -22,25 +22,25 @@ namespace MQTTnet.Client
{ {
public class MqttClient : Disposable, IMqttClient public class MqttClient : Disposable, IMqttClient
{ {
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly Stopwatch _receiveTracker = new Stopwatch();
private readonly object _disconnectLock = new object();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
readonly Stopwatch _sendTracker = new Stopwatch();
readonly Stopwatch _receiveTracker = new Stopwatch();
readonly object _disconnectLock = new object();


private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetLogger _logger;
readonly IMqttClientAdapterFactory _adapterFactory;
readonly IMqttNetLogger _logger;


private CancellationTokenSource _backgroundCancellationTokenSource;
private Task _packetReceiverTask;
private Task _keepAlivePacketsSenderTask;
private Task _publishPacketReceiverTask;
CancellationTokenSource _backgroundCancellationTokenSource;
Task _packetReceiverTask;
Task _keepAlivePacketsSenderTask;
Task _publishPacketReceiverTask;


private AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;
AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;


private IMqttChannelAdapter _adapter;
private bool _cleanDisconnectInitiated;
private long _disconnectGate;
IMqttChannelAdapter _adapter;
bool _cleanDisconnectInitiated;
long _disconnectGate;


public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{ {
@@ -134,6 +134,10 @@ namespace MQTTnet.Client


public async Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken) public async Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken)
{ {
if (options is null) throw new ArgumentNullException(nameof(options));

ThrowIfDisposed();

try try
{ {
_cleanDisconnectInitiated = true; _cleanDisconnectInitiated = true;
@@ -157,6 +161,9 @@ namespace MQTTnet.Client
{ {
if (data == null) throw new ArgumentNullException(nameof(data)); if (data == null) throw new ArgumentNullException(nameof(data));


ThrowIfDisposed();
ThrowIfNotConnected();

return SendAsync(new MqttAuthPacket return SendAsync(new MqttAuthPacket
{ {
Properties = new MqttAuthPacketProperties Properties = new MqttAuthPacketProperties
@@ -230,7 +237,7 @@ namespace MQTTnet.Client
} }
} }


private void Cleanup()
void Cleanup()
{ {
_backgroundCancellationTokenSource?.Cancel(false); _backgroundCancellationTokenSource?.Cancel(false);
_backgroundCancellationTokenSource?.Dispose(); _backgroundCancellationTokenSource?.Dispose();
@@ -240,7 +247,6 @@ namespace MQTTnet.Client
_publishPacketReceiverQueue = null; _publishPacketReceiverQueue = null;


_adapter?.Dispose(); _adapter?.Dispose();
_adapter = null;
} }




@@ -252,10 +258,11 @@ namespace MQTTnet.Client


DisconnectedHandler = null; DisconnectedHandler = null;
} }

base.Dispose(disposing); base.Dispose(disposing);
} }


private async Task<MqttClientAuthenticateResult> AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
async Task<MqttClientAuthenticateResult> AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
{ {
var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket( var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket(
willApplicationMessage, willApplicationMessage,
@@ -274,17 +281,17 @@ namespace MQTTnet.Client
return result; return result;
} }


private void ThrowIfNotConnected()
void ThrowIfNotConnected()
{ {
if (!IsConnected || Interlocked.Read(ref _disconnectGate) == 1) throw new MqttCommunicationException("The client is not connected."); if (!IsConnected || Interlocked.Read(ref _disconnectGate) == 1) throw new MqttCommunicationException("The client is not connected.");
} }


private void ThrowIfConnected(string message)
void ThrowIfConnected(string message)
{ {
if (IsConnected) throw new MqttProtocolViolationException(message); if (IsConnected) throw new MqttProtocolViolationException(message);
} }


private async Task DisconnectInternalAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult)
async Task DisconnectInternalAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult)
{ {
var clientWasConnected = IsConnected; var clientWasConnected = IsConnected;


@@ -337,7 +344,7 @@ namespace MQTTnet.Client
} }
} }


private void TryInitiateDisconnect()
void TryInitiateDisconnect()
{ {
lock (_disconnectLock) lock (_disconnectLock)
{ {
@@ -369,7 +376,7 @@ namespace MQTTnet.Client
return _adapter.SendPacketAsync(packet, Options.CommunicationTimeout, cancellationToken); return _adapter.SendPacketAsync(packet, Options.CommunicationTimeout, cancellationToken);
} }


private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();


@@ -411,7 +418,7 @@ namespace MQTTnet.Client
} }
} }


private async Task TrySendKeepAliveMessagesAsync(CancellationToken cancellationToken)
async Task TrySendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{ {
try try
{ {
@@ -466,7 +473,7 @@ namespace MQTTnet.Client
} }
} }


private async Task TryReceivePacketsAsync(CancellationToken cancellationToken)
async Task TryReceivePacketsAsync(CancellationToken cancellationToken)
{ {
try try
{ {
@@ -526,7 +533,7 @@ namespace MQTTnet.Client
} }
} }


private async Task TryProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
async Task TryProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{ {
try try
{ {
@@ -596,7 +603,7 @@ namespace MQTTnet.Client
} }
} }


private void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket)
void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket)
{ {
try try
{ {
@@ -608,7 +615,7 @@ namespace MQTTnet.Client
} }
} }


private async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken)
async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken)
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
@@ -663,21 +670,21 @@ namespace MQTTnet.Client
} }
} }


private async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false); await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false);
return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(null); return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(null);
} }


private async Task<MqttClientPublishResult> PublishAtLeastOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
async Task<MqttClientPublishResult> PublishAtLeastOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
var response = await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, cancellationToken).ConfigureAwait(false); var response = await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, cancellationToken).ConfigureAwait(false);
return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(response); return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(response);
} }


private async Task<MqttClientPublishResult> PublishExactlyOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
async Task<MqttClientPublishResult> PublishExactlyOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{ {
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();


@@ -694,7 +701,7 @@ namespace MQTTnet.Client
return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket); return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket);
} }


private async Task<bool> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
async Task<bool> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
{ {
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);


@@ -709,7 +716,7 @@ namespace MQTTnet.Client
return true; return true;
} }


private async Task WaitForTaskAsync(Task task, Task sender)
async Task WaitForTaskAsync(Task task, Task sender)
{ {
if (task == null) if (task == null)
{ {
@@ -740,7 +747,7 @@ namespace MQTTnet.Client
} }
} }


private bool DisconnectIsPending()
bool DisconnectIsPending()
{ {
return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0; return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0;
} }


+ 6
- 6
Source/MQTTnet/Client/MqttClientExtensions.cs View File

@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.ExtendedAuthenticationExchange; using MQTTnet.Client.ExtendedAuthenticationExchange;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
@@ -11,6 +7,10 @@ using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing; using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing; using MQTTnet.Client.Unsubscribing;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;


namespace MQTTnet.Client namespace MQTTnet.Client
{ {
@@ -128,7 +128,7 @@ namespace MQTTnet.Client
{ {
if (client == null) throw new ArgumentNullException(nameof(client)); if (client == null) throw new ArgumentNullException(nameof(client));


return client.DisconnectAsync(null);
return client.DisconnectAsync(new MqttClientDisconnectOptions());
} }


public static Task<MqttClientSubscribeResult> SubscribeAsync(this IMqttClient client, params TopicFilter[] topicFilters) public static Task<MqttClientSubscribeResult> SubscribeAsync(this IMqttClient client, params TopicFilter[] topicFilters)


+ 7
- 7
Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs View File

@@ -1,6 +1,4 @@
using System;
using System.Linq;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing; using MQTTnet.Client.Publishing;
@@ -10,6 +8,8 @@ using MQTTnet.Exceptions;
using MQTTnet.Packets; using MQTTnet.Packets;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using System;
using System.Linq;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult; using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;


namespace MQTTnet.Formatter.V3 namespace MQTTnet.Formatter.V3
@@ -153,7 +153,7 @@ namespace MQTTnet.Formatter.V3
{ {
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));
if (unsubAckPacket == null) throw new ArgumentNullException(nameof(unsubAckPacket)); if (unsubAckPacket == null) throw new ArgumentNullException(nameof(unsubAckPacket));
var result = new MqttClientUnsubscribeResult(); var result = new MqttClientUnsubscribeResult();


result.Items.AddRange(unsubscribePacket.TopicFilters.Select((t, i) => result.Items.AddRange(unsubscribePacket.TopicFilters.Select((t, i) =>
@@ -168,7 +168,7 @@ namespace MQTTnet.Formatter.V3


var subscribePacket = new MqttSubscribePacket(); var subscribePacket = new MqttSubscribePacket();
subscribePacket.TopicFilters.AddRange(options.TopicFilters); subscribePacket.TopicFilters.AddRange(options.TopicFilters);
return subscribePacket; return subscribePacket;
} }


@@ -184,9 +184,9 @@ namespace MQTTnet.Formatter.V3


public MqttDisconnectPacket CreateDisconnectPacket(MqttClientDisconnectOptions options) public MqttDisconnectPacket CreateDisconnectPacket(MqttClientDisconnectOptions options)
{ {
if (options != null)
if (options.ReasonCode != MqttClientDisconnectReason.NormalDisconnection || options.ReasonString != null)
{ {
throw new MqttProtocolViolationException("Reason codes for disconnect are only supported for MQTTv5.");
throw new MqttProtocolViolationException("Reason codes and reason string for disconnect are only supported for MQTTv5.");
} }


return new MqttDisconnectPacket(); return new MqttDisconnectPacket();


+ 1
- 1
Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs View File

@@ -26,7 +26,7 @@ namespace MQTTnet.Benchmarks
{ {
if (_useHandler) if (_useHandler)
{ {
eventArgs.TraceMessage.ToString();
eventArgs.LogMessage.ToString();
} }
} }




+ 9
- 9
Tests/MQTTnet.TestApp.NetCore/MqttNetConsoleLogger.cs View File

@@ -1,12 +1,12 @@
using System;
using MQTTnet.Diagnostics;
using System;
using System.Text; using System.Text;
using MQTTnet.Diagnostics;


namespace MQTTnet.TestApp.NetCore namespace MQTTnet.TestApp.NetCore
{ {
public static class MqttNetConsoleLogger public static class MqttNetConsoleLogger
{ {
private static readonly object Lock = new object();
static readonly object _lock = new object();


public static void ForwardToConsole() public static void ForwardToConsole()
{ {
@@ -16,7 +16,7 @@ namespace MQTTnet.TestApp.NetCore


public static void PrintToConsole(string message, ConsoleColor color) public static void PrintToConsole(string message, ConsoleColor color)
{ {
lock (Lock)
lock (_lock)
{ {
var backupColor = Console.ForegroundColor; var backupColor = Console.ForegroundColor;
Console.ForegroundColor = color; Console.ForegroundColor = color;
@@ -25,17 +25,17 @@ namespace MQTTnet.TestApp.NetCore
} }
} }


private static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e)
static void PrintToConsole(object sender, MqttNetLogMessagePublishedEventArgs e)
{ {
var output = new StringBuilder(); var output = new StringBuilder();
output.AppendLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
output.AppendLine($">> [{e.LogMessage.Timestamp:O}] [{e.LogMessage.ThreadId}] [{e.LogMessage.Source}] [{e.LogMessage.Level}]: {e.LogMessage.Message}");
if (e.LogMessage.Exception != null)
{ {
output.AppendLine(e.TraceMessage.Exception.ToString());
output.AppendLine(e.LogMessage.Exception.ToString());
} }


var color = ConsoleColor.Red; var color = ConsoleColor.Red;
switch (e.TraceMessage.Level)
switch (e.LogMessage.Level)
{ {
case MqttNetLogLevel.Error: case MqttNetLogLevel.Error:
color = ConsoleColor.Red; color = ConsoleColor.Red;


+ 45
- 43
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -6,6 +6,7 @@ using MQTTnet.Diagnostics;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Extensions.ManagedClient; using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Extensions.Rpc; using MQTTnet.Extensions.Rpc;
using MQTTnet.Extensions.Rpc.Options;
using MQTTnet.Extensions.WebSocket4Net; using MQTTnet.Extensions.WebSocket4Net;
using MQTTnet.Formatter; using MQTTnet.Formatter;
using MQTTnet.Implementations; using MQTTnet.Implementations;
@@ -20,8 +21,6 @@ using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates; using Windows.Security.Cryptography.Certificates;
using Windows.UI.Core; using Windows.UI.Core;
using Windows.UI.Xaml; using Windows.UI.Xaml;
using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;


namespace MQTTnet.TestApp.UniversalWindows namespace MQTTnet.TestApp.UniversalWindows
{ {
@@ -45,7 +44,7 @@ namespace MQTTnet.TestApp.UniversalWindows


private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e) private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
{ {
_traceMessages.Enqueue(e.TraceMessage);
_traceMessages.Enqueue(e.LogMessage);
await UpdateLogAsync(); await UpdateLogAsync();
} }


@@ -164,16 +163,16 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
await _mqttClient.DisconnectAsync(); await _mqttClient.DisconnectAsync();
_mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());
} }


if (UseManagedClient.IsChecked == true) if (UseManagedClient.IsChecked == true)
{ {
_managedMqttClient = mqttFactory.CreateManagedMqttClient(); _managedMqttClient = mqttFactory.CreateManagedMqttClient();
_managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _managedMqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
_managedMqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
_managedMqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());


await _managedMqttClient.StartAsync(new ManagedMqttClientOptions await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
{ {
@@ -184,8 +183,8 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
_mqttClient = mqttFactory.CreateMqttClient(); _mqttClient = mqttFactory.CreateMqttClient();
_mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _mqttClient.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected(x));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected(x));
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());


await _mqttClient.ConnectAsync(options); await _mqttClient.ConnectAsync(options);
} }
@@ -196,7 +195,7 @@ namespace MQTTnet.TestApp.UniversalWindows
} }
} }


private void OnDisconnected(MqttClientDisconnectedEventArgs e)
private void OnDisconnected()
{ {
_traceMessages.Enqueue(new MqttNetLogMessage _traceMessages.Enqueue(new MqttNetLogMessage
{ {
@@ -209,7 +208,7 @@ namespace MQTTnet.TestApp.UniversalWindows
Task.Run(UpdateLogAsync); Task.Run(UpdateLogAsync);
} }


private void OnConnected(MqttClientConnectedEventArgs e)
private void OnConnected()
{ {
_traceMessages.Enqueue(new MqttNetLogMessage _traceMessages.Enqueue(new MqttNetLogMessage
{ {
@@ -250,7 +249,7 @@ namespace MQTTnet.TestApp.UniversalWindows
qos = MqttQualityOfServiceLevel.ExactlyOnce; qos = MqttQualityOfServiceLevel.ExactlyOnce;
} }


var payload = new byte[0];
var payload = Array.Empty<byte>();
if (PlainText.IsChecked == true) if (PlainText.IsChecked == true)
{ {
payload = Encoding.UTF8.GetBytes(Payload.Text); payload = Encoding.UTF8.GetBytes(Payload.Text);
@@ -433,7 +432,7 @@ namespace MQTTnet.TestApp.UniversalWindows
qos = MqttQualityOfServiceLevel.ExactlyOnce; qos = MqttQualityOfServiceLevel.ExactlyOnce;
} }


var payload = new byte[0];
var payload = Array.Empty<byte>();
if (RpcText.IsChecked == true) if (RpcText.IsChecked == true)
{ {
payload = Encoding.UTF8.GetBytes(RpcPayload.Text); payload = Encoding.UTF8.GetBytes(RpcPayload.Text);
@@ -446,7 +445,7 @@ namespace MQTTnet.TestApp.UniversalWindows


try try
{ {
var rpcClient = new MqttRpcClient(_mqttClient);
var rpcClient = new MqttRpcClient(_mqttClient, new MqttRpcClientOptions());
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos); var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos);


RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response)); RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response));
@@ -491,7 +490,9 @@ namespace MQTTnet.TestApp.UniversalWindows


#region Wiki Code #region Wiki Code


#pragma warning disable IDE0051 // Remove unused private members
private async Task WikiCode() private async Task WikiCode()
#pragma warning restore IDE0051 // Remove unused private members
{ {
{ {
// Use a custom identifier for the trace messages. // Use a custom identifier for the trace messages.
@@ -627,30 +628,31 @@ namespace MQTTnet.TestApp.UniversalWindows


// ---------------------------------- // ----------------------------------
{ {
var options = new MqttServerOptions();

options.ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
var options = new MqttServerOptions
{ {
if (c.ClientId.Length < 10)
ConnectionValidator = new MqttServerConnectionValidatorDelegate(c =>
{ {
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
return;
}
if (c.ClientId.Length < 10)
{
c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
return;
}


if (c.Username != "mySecretUser")
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
return;
}
if (c.Username != "mySecretUser")
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}


if (c.Password != "mySecretPassword")
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
return;
}
if (c.Password != "mySecretPassword")
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}


c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
});
c.ReasonCode = MqttConnectReasonCode.Success;
})
};


var factory = new MqttFactory(); var factory = new MqttFactory();
var mqttServer = factory.CreateMqttServer(); var mqttServer = factory.CreateMqttServer();
@@ -672,7 +674,7 @@ namespace MQTTnet.TestApp.UniversalWindows
return new[] { ChainValidationResult.Revoked }; return new[] { ChainValidationResult.Revoked };
} }


return new ChainValidationResult[0];
return Array.Empty<ChainValidationResult>();
}; };


{ {
@@ -698,11 +700,11 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
if (c.ClientId != "Highlander") if (c.ClientId != "Highlander")
{ {
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
return; return;
} }


c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
c.ReasonCode = MqttConnectReasonCode.Success;
}); });


var mqttServer = new MqttFactory().CreateMqttServer(); var mqttServer = new MqttFactory().CreateMqttServer();
@@ -717,23 +719,23 @@ namespace MQTTnet.TestApp.UniversalWindows
{ {
if (c.ClientId.Length < 10) if (c.ClientId.Length < 10)
{ {
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
return; return;
} }


if (c.Username != "mySecretUser") if (c.Username != "mySecretUser")
{ {
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return; return;
} }


if (c.Password != "mySecretPassword") if (c.Password != "mySecretPassword")
{ {
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return; return;
} }


c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
c.ReasonCode = MqttConnectReasonCode.Success;
}) })
}; };
} }
@@ -816,10 +818,10 @@ namespace MQTTnet.TestApp.UniversalWindows
// Write all trace messages to the console window. // Write all trace messages to the console window.
MqttNetGlobalLogger.LogMessagePublished += (s, e) => MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
{ {
var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
if (e.TraceMessage.Exception != null)
var trace = $">> [{e.LogMessage.Timestamp:O}] [{e.LogMessage.ThreadId}] [{e.LogMessage.Source}] [{e.LogMessage.Level}]: {e.LogMessage.Message}";
if (e.LogMessage.Exception != null)
{ {
trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
trace += Environment.NewLine + e.LogMessage.Exception.ToString();
} }


Console.WriteLine(trace); Console.WriteLine(trace);


Loading…
Cancel
Save