Просмотр исходного кода

Merge branch 'develop'

release/3.x.x
Christian Kratky 6 лет назад
Родитель
Сommit
0bdb7e9e44
34 измененных файлов: 576 добавлений и 197 удалений
  1. +3
    -2
      Build/MQTTnet.AspNetCore.nuspec
  2. +3
    -2
      Build/MQTTnet.Extensions.ManagedClient.nuspec
  3. +3
    -2
      Build/MQTTnet.Extensions.Rpc.nuspec
  4. +6
    -27
      Build/MQTTnet.nuspec
  5. +10
    -4
      Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs
  6. +9
    -0
      Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
  7. +2
    -1
      Source/MQTTnet.AspnetCore/ReaderExtensions.cs
  8. +1
    -1
      Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
  9. +2
    -0
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs
  10. +1
    -1
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  11. +2
    -0
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs
  12. +1
    -1
      Source/MQTTnet.Extensions.Rpc/SampleCCode.c
  13. +134
    -0
      Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs
  14. +16
    -15
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  15. +5
    -1
      Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs
  16. +28
    -21
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  17. +8
    -0
      Source/MQTTnet/Internal/ExceptionHelper.cs
  18. +5
    -5
      Source/MQTTnet/Internal/TestMqttChannel.cs
  19. +2
    -2
      Source/MQTTnet/MQTTnet.csproj
  20. +10
    -3
      Source/MQTTnet/MqttFactory.cs
  21. +10
    -7
      Source/MQTTnet/Serializer/MqttPacketBodyReader.cs
  22. +68
    -17
      Source/MQTTnet/Serializer/MqttPacketReader.cs
  23. +0
    -2
      Source/MQTTnet/Serializer/MqttPacketWriter.cs
  24. +8
    -2
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  25. +4
    -3
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  26. +4
    -1
      Source/MQTTnet/Server/MqttConnectionValidatorContext.cs
  27. +1
    -1
      Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs
  28. +52
    -4
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  29. +4
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  30. +7
    -2
      Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs
  31. +4
    -3
      Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
  32. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  33. +88
    -39
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  34. +74
    -26
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 3
- 2
Build/MQTTnet.AspNetCore.nuspec Просмотреть файл

@@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.0" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.0.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.0.1" />


+ 3
- 2
Build/MQTTnet.Extensions.ManagedClient.nuspec Просмотреть файл

@@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which provides a managed MQTT client with additional features using MQTTnet.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
</dependencies>
</metadata>



+ 3
- 2
Build/MQTTnet.Extensions.Rpc.nuspec Просмотреть файл

@@ -10,12 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is an extension library which allows executing synchronous device calls including a response using MQTTnet.</description>
<releaseNotes>* Updated to MQTTnet 2.8.0.
<releaseNotes>* Updated to MQTTnet 2.8.1.
* For more release notes please check the MQTTnet release notes.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.0" />
<dependency id="MQTTnet" version="2.8.1" />
</dependencies>
</metadata>



+ 6
- 27
Build/MQTTnet.nuspec Просмотреть файл

@@ -10,33 +10,12 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> ** MQTTnet is now available at Open Collective for donations (https://opencollective.com/mqttnet). **
* [Core] Performance optimizations.
* [Core] Due to performance reasons the timestamp of log messages is now in UTC format.
* [Core] Added several packet validations.
* [Core] Log messages now contain the complete source path including parent components.
* [Core] The adapter now has an _Endpoint_ definition as string containing remote IP and port.
* [Client] Received messages are now processed completely in the worker thread without creating new Tasks.
* [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger)
* [Client] A clean disconnect (via DisconnectAsync) will no longer throw an exception.
* [Client] Added new overloads for quick message publishing.
* [ManagedClient] The managed client is moved to a separate nuget package.
* [ManagedClient] Added an own message format with extended properties like ID (BREAKING CHANGE).
* [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta).
* [ManagedClient] Added a new event which is fired when a synchronization of the subscriptions has failed.
* [ManagedClient] Added a new event which is fired when a connection attempt has failed.
* [ManagedClient] Exposed a new property which provides the count of not published messages (pending messages count).
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client.
* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan)
* [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely.
* [Server] Rewritten the _ConnectedClients_ API and added new features for disconnecting and Endpoint information (IP etc.).
* [Server] Added settings for disabling persistent sessions and defining a max pending messages queue size per session.
* [Server] Persistent sessions are disabled by default (BREAKING CHANGE!).
* [Server] Added a new interceptor which is invoked before a new message is added to the client queue.
* [Server] Added support for Linux servers by dividing IPv4 and IPv6 support and adding new options (BREAKING CHANGE!).
* [Server] Gracefully closed connections are no longer reported as warnings.
* [Server] Added new overloads for initializing the ASP.NET Core integration.
<releaseNotes>* [Core] Performance optimizations.
* [Core] Fixed a bug which prevents receiving large packets (UWP only)
* [Client] The ManagedClient options now allow configuring the interval for connection checks.
* [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback.
* [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_.
* [Server] Fix a bug in the keep alive monitor which caused high CPU load (thanks to @GarageGadget).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 10
- 4
Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs Просмотреть файл

@@ -3,6 +3,7 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using MQTTnet.Server;
using System.Collections.Generic;

namespace MQTTnet.AspNetCore
{
@@ -23,10 +24,7 @@ namespace MQTTnet.AspNetCore

if (context.Request.Headers.TryGetValue("Sec-WebSocket-Protocol", out var requestedSubProtocolValues))
{
// Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc.
subProtocol = requestedSubProtocolValues
.OrderByDescending(p => p.Length)
.FirstOrDefault(p => p.ToLower().StartsWith("mqtt"));
subProtocol = SelectSubProtocol(requestedSubProtocolValues);
}

var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>();
@@ -40,6 +38,14 @@ namespace MQTTnet.AspNetCore
return app;
}

public static string SelectSubProtocol(IList<string> requestedSubProtocolValues)
{
// Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc.
return requestedSubProtocolValues
.OrderByDescending(p => p.Length)
.FirstOrDefault(p => p.ToLower().StartsWith("mqtt"));
}

public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action<IMqttServer> configure)
{
var server = app.ApplicationServices.GetRequiredService<IMqttServer>();


+ 9
- 0
Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs Просмотреть файл

@@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using MQTTnet.Adapter;
using MQTTnet.Serializer;
using MQTTnet.Server;
@@ -13,6 +14,14 @@ namespace MQTTnet.AspNetCore

public override async Task OnConnectedAsync(ConnectionContext connection)
{
// required for websocket transport to work
var transferFormatFeature = connection.Features.Get<ITransferFormatFeature>();
if (transferFormatFeature != null)
{
transferFormatFeature.ActiveFormat = TransferFormat.Binary;
}


var serializer = new MqttPacketSerializer();
using (var adapter = new MqttConnectionContext(serializer, connection))
{


+ 2
- 1
Source/MQTTnet.AspnetCore/ReaderExtensions.cs Просмотреть файл

@@ -32,7 +32,8 @@ namespace MQTTnet.AspNetCore
}

var bodySlice = copy.Slice(0, bodyLength);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray(), 0)));
var buffer = bodySlice.GetArray();
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(buffer, 0, buffer.Length)));
consumed = bodySlice.End;
observed = bodySlice.End;
return true;


+ 1
- 1
Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs Просмотреть файл

@@ -44,7 +44,7 @@ namespace MQTTnet.AspNetCore
return services;
}

public static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
private static IServiceCollection AddHostedMqttServer(this IServiceCollection services)
{
var logger = new MqttNetLogger();
var childLogger = logger.CreateChildLogger();


+ 2
- 0
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs Просмотреть файл

@@ -9,6 +9,8 @@ namespace MQTTnet.Extensions.ManagedClient

TimeSpan AutoReconnectDelay { get; }

TimeSpan ConnectionCheckInterval { get; }

IManagedMqttClientStorage Storage { get; }
}
}

+ 1
- 1
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs Просмотреть файл

@@ -206,7 +206,7 @@ namespace MQTTnet.Extensions.ManagedClient

if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)


+ 2
- 0
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs Просмотреть файл

@@ -9,6 +9,8 @@ namespace MQTTnet.Extensions.ManagedClient

public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1);

public IManagedMqttClientStorage Storage { get; set; }
}
}

+ 1
- 1
Source/MQTTnet.Extensions.Rpc/SampleCCode.c Просмотреть файл

@@ -3,7 +3,7 @@ _mqttClient.subscribe("MQTTnet.RPC/+/ping");
_mqttClient.subscribe("MQTTnet.RPC/+/do_something");

// It is not allowed to change the structure of the topic. Otherwise RPC will not work. So method names can be separated using
// an _ or . but no +, # or . If it is required to distinguish between devices own rules can be defined like the following.
// an _ or . but no +, # or /. If it is required to distinguish between devices own rules can be defined like the following.
_mqttClient.subscribe("MQTTnet.RPC/+/deviceA.ping");
_mqttClient.subscribe("MQTTnet.RPC/+/deviceB.ping");
_mqttClient.subscribe("MQTTnet.RPC/+/deviceC.getTemperature");


+ 134
- 0
Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs Просмотреть файл

@@ -0,0 +1,134 @@
using MQTTnet.Client;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Channel;
using MQTTnet.Diagnostics;
using MQTTnet.Serializer;
using WebSocket4Net;

namespace MQTTnet.TestApp.NetCore
{
public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory
{
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (logger == null) throw new ArgumentNullException(nameof(logger));

if (!(options.ChannelOptions is MqttClientWebSocketOptions))
{
throw new NotSupportedException("Only WebSocket connections are supported.");
}

return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options), new MqttPacketSerializer(), logger);
}

private class WebSocket4NetMqttChannel : IMqttChannel
{
private readonly BlockingCollection<byte> _receiveBuffer = new BlockingCollection<byte>();

private readonly IMqttClientOptions _clientOptions;
private WebSocket4Net.WebSocket _webSocket;

public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions)
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
}

public string Endpoint { get; } = "";

public Task ConnectAsync(CancellationToken cancellationToken)
{
var channelOptions = (MqttClientWebSocketOptions)_clientOptions.ChannelOptions;

var uri = "ws://" + channelOptions.Uri;
var sslProtocols = SslProtocols.None;

if (channelOptions.TlsOptions.UseTls)
{
uri = "wss://" + channelOptions.Uri;
sslProtocols = SslProtocols.Tls12;
}

var subProtocol = channelOptions.SubProtocols.FirstOrDefault() ?? string.Empty;

_webSocket = new WebSocket4Net.WebSocket(uri, subProtocol, sslProtocols: sslProtocols);
_webSocket.DataReceived += OnDataReceived;
_webSocket.Open();
SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Open, _clientOptions.CommunicationTimeout);

return Task.FromResult(0);
}

public Task DisconnectAsync()
{
if (_webSocket != null)
{
_webSocket.DataReceived -= OnDataReceived;
_webSocket.Close();
SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Closed, _clientOptions.CommunicationTimeout);
}

_webSocket = null;

return Task.FromResult(0);
}

public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var readBytes = 0;
while (count > 0 && !cancellationToken.IsCancellationRequested)
{
byte @byte;
if (readBytes == 0)
{
// Block until at lease one byte was received.
@byte = _receiveBuffer.Take(cancellationToken);
}
else
{
if (!_receiveBuffer.TryTake(out @byte))
{
return Task.FromResult(readBytes);
}
}
buffer[offset] = @byte;
offset++;
count--;
readBytes++;
}

return Task.FromResult(readBytes);
}

public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
_webSocket.Send(buffer, offset, count);
return Task.FromResult(0);
}

public void Dispose()
{
if (_webSocket != null)
{
_webSocket.DataReceived -= OnDataReceived;
_webSocket.Dispose();
}
}

private void OnDataReceived(object sender, WebSocket4Net.DataReceivedEventArgs e)
{
foreach (var @byte in e.Data)
{
_receiveBuffer.Add(@byte);
}
}
}
}
}

+ 16
- 15
Source/MQTTnet/Adapter/MqttChannelAdapter.cs Просмотреть файл

@@ -97,13 +97,11 @@ namespace MQTTnet.Adapter
await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
_logger.Verbose("TX >>> {0}", packet);

var packetData = PacketSerializer.Serialize(packet);

await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);

PacketSerializer.FreeBuffer();

_logger.Verbose("TX >>> {0}", packet);
}
catch (Exception exception)
{
@@ -130,11 +128,11 @@ namespace MQTTnet.Adapter

if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false);
}
else
{
receivedMqttPacket = await ReceiveAsync(_channel, cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false);
}

if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
@@ -149,7 +147,7 @@ namespace MQTTnet.Adapter
}

_logger.Verbose("RX <<< {0}", packet);
return packet;
}
catch (Exception exception)
@@ -165,9 +163,9 @@ namespace MQTTnet.Adapter
return null;
}

private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken)
private async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false);
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(_channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false);

try
{
@@ -190,18 +188,21 @@ namespace MQTTnet.Adapter
chunkSize = bytesLeft;
}

#if WINDOWS_UWP
var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
#else
// async/await is not used to avoid the overhead of context switches. We assume that the reamining data
// has been sent from the sender directly after the initial bytes.
var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).GetAwaiter().GetResult();
if (readBytes <= 0)
{
ExceptionHelper.ThrowGracefulSocketClose();
}
var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
#endif
cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowIfGracefulSocketClose(readBytes);

bodyOffset += readBytes;
} while (bodyOffset < body.Length);

return new ReceivedMqttPacket(fixedHeader.Flags, new MqttPacketBodyReader(body, 0));
return new ReceivedMqttPacket(fixedHeader.Flags, new MqttPacketBodyReader(body, 0, body.Length));
}
finally
{


+ 5
- 1
Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs Просмотреть файл

@@ -154,7 +154,11 @@ namespace MQTTnet.Implementations

private void CreateStreams()
{
_readStream = _socket.InputStream.AsStreamForRead(_bufferSize);
// Attention! Do not set the buffer for the read method. This will
// limit the internal buffer and the read operation will hang forever
// if more data than the buffer size was received.
_readStream = _socket.InputStream.AsStreamForRead();

_writeStream = _socket.OutputStream.AsStreamForWrite(_bufferSize);
}



+ 28
- 21
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs Просмотреть файл

@@ -1,6 +1,7 @@
#if !WINDOWS_UWP
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
@@ -79,27 +80,33 @@ namespace MQTTnet.Implementations

private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate)
{
var listenerV4 = new MqttTcpServerListener(
AddressFamily.InterNetwork,
options,
tlsCertificate,
_cancellationTokenSource.Token,
_logger);

listenerV4.ClientAccepted += OnClientAccepted;
listenerV4.Start();
_listeners.Add(listenerV4);

var listenerV6 = new MqttTcpServerListener(
AddressFamily.InterNetworkV6,
options,
tlsCertificate,
_cancellationTokenSource.Token,
_logger);

listenerV6.ClientAccepted += OnClientAccepted;
listenerV6.Start();
_listeners.Add(listenerV6);
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None))
{
var listenerV4 = new MqttTcpServerListener(
AddressFamily.InterNetwork,
options,
tlsCertificate,
_cancellationTokenSource.Token,
_logger);

listenerV4.ClientAccepted += OnClientAccepted;
listenerV4.Start();
_listeners.Add(listenerV4);
}

if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None))
{
var listenerV6 = new MqttTcpServerListener(
AddressFamily.InterNetworkV6,
options,
tlsCertificate,
_cancellationTokenSource.Token,
_logger);

listenerV6.ClientAccepted += OnClientAccepted;
listenerV6.Start();
_listeners.Add(listenerV6);
}
}

private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs e)


+ 8
- 0
Source/MQTTnet/Internal/ExceptionHelper.cs Просмотреть файл

@@ -8,5 +8,13 @@ namespace MQTTnet.Internal
{
throw new MqttCommunicationClosedGracefullyException();
}

public static void ThrowIfGracefulSocketClose(int readBytesCount)
{
if (readBytesCount <= 0)
{
throw new MqttCommunicationClosedGracefullyException();
}
}
}
}

+ 5
- 5
Source/MQTTnet/Internal/TestMqttChannel.cs Просмотреть файл

@@ -14,11 +14,7 @@ namespace MQTTnet.Internal
_stream = stream;
}

public void Dispose()
{
}

public string Endpoint { get; }
public string Endpoint { get; } = "<Test channel>";

public Task ConnectAsync(CancellationToken cancellationToken)
{
@@ -39,5 +35,9 @@ namespace MQTTnet.Internal
{
return _stream.WriteAsync(buffer, offset, count, cancellationToken);
}

public void Dispose()
{
}
}
}

+ 2
- 2
Source/MQTTnet/MQTTnet.csproj Просмотреть файл

@@ -20,7 +20,7 @@
<NugetTargetMoniker>UAP,Version=v10.0</NugetTargetMoniker>
<TargetPlatformIdentifier>UAP</TargetPlatformIdentifier>
<TargetPlatformVersion>10.0.17134.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.10240.0</TargetPlatformMinVersion>
<TargetPlatformMinVersion>10.0.10586.212</TargetPlatformMinVersion>
<TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier>
<TargetFrameworkVersion>v5.0</TargetFrameworkVersion>
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants>
@@ -53,7 +53,7 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='uap10.0'">
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="6.1.4" />
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform" Version="6.1.5" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net452'">


+ 10
- 3
Source/MQTTnet/MqttFactory.cs Просмотреть файл

@@ -22,12 +22,19 @@ namespace MQTTnet
return new MqttClient(new MqttClientAdapterFactory(), logger);
}

public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory mqttClientAdapterFactory)
public IMqttClient CreateMqttClient(IMqttClientAdapterFactory adapterFactory)
{
if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory));

return new MqttClient(adapterFactory, new MqttNetLogger());
}

public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory adapterFactory)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
if (mqttClientAdapterFactory == null) throw new ArgumentNullException(nameof(mqttClientAdapterFactory));
if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory));

return new MqttClient(mqttClientAdapterFactory, logger);
return new MqttClient(adapterFactory, logger);
}

public IMqttServer CreateMqttServer()


+ 10
- 7
Source/MQTTnet/Serializer/MqttPacketBodyReader.cs Просмотреть файл

@@ -6,17 +6,20 @@ namespace MQTTnet.Serializer
public class MqttPacketBodyReader
{
private readonly byte[] _buffer;
private int _offset;
private readonly int _length;

public MqttPacketBodyReader(byte[] buffer, int offset)
private int _offset;
public MqttPacketBodyReader(byte[] buffer, int offset, int length)
{
_buffer = buffer;
_offset = offset;
_length = length;
}

public int Length => _buffer.Length - _offset;
public int Length => _length - _offset;

public bool EndOfStream => _offset == _buffer.Length;
public bool EndOfStream => _offset == _length;

public byte ReadByte()
{
@@ -26,7 +29,7 @@ namespace MQTTnet.Serializer

public ArraySegment<byte> ReadRemainingData()
{
return new ArraySegment<byte>(_buffer, _offset, _buffer.Length - _offset);
return new ArraySegment<byte>(_buffer, _offset, _length - _offset);
}

public ushort ReadUInt16()
@@ -53,9 +56,9 @@ namespace MQTTnet.Serializer

private void ValidateReceiveBuffer(ushort length)
{
if (_buffer.Length < _offset + length)
if (_length < _offset + length)
{
throw new ArgumentOutOfRangeException(nameof(_buffer), $"expected at least {_offset + length} bytes but there are only {_buffer.Length} bytes");
throw new ArgumentOutOfRangeException(nameof(_buffer), $"expected at least {_offset + length} bytes but there are only {_length} bytes");
}
}



+ 68
- 17
Source/MQTTnet/Serializer/MqttPacketReader.cs Просмотреть файл

@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Channel;
using MQTTnet.Exceptions;
@@ -21,11 +20,9 @@ namespace MQTTnet.Serializer
while (totalBytesRead < buffer.Length)
{
var bytesRead = await channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false);
if (bytesRead <= 0)
{
cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowGracefulSocketClose();
}

cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowIfGracefulSocketClose(bytesRead);

totalBytesRead += bytesRead;
}
@@ -35,50 +32,104 @@ namespace MQTTnet.Serializer
{
return new MqttFixedHeader(buffer[0], 0);
}

#if WINDOWS_UWP
// UWP will have a dead lock when calling this not async.
var bodyLength = await ReadBodyLengthAsync(channel, buffer[1], singleByteBuffer, cancellationToken).ConfigureAwait(false);
#else
// Here the async/await pattern is not used becuase the overhead of context switches
// is too big for reading 1 byte in a row. We expect that the remaining data was sent
// directly after the initial bytes. If the client disconnects just in this moment we
// will get an exception anyway.
var bodyLength = ReadBodyLength(channel, buffer[1], singleByteBuffer, cancellationToken);
#endif

return new MqttFixedHeader(buffer[0], bodyLength);
}

#if !WINDOWS_UWP
private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html.
var offset = 0;
var multiplier = 128;
var value = initialEncodedByte & 127;
int encodedByte = initialEncodedByte;

while ((encodedByte & 128) != 0)
{
offset++;
if (offset > 3)
{
throw new MqttProtocolViolationException("Remaining length is invalid.");
}

cancellationToken.ThrowIfCancellationRequested();

// Here the async/await pattern is not used becuase the overhead of context switches
// is too big for reading 1 byte in a row. We expect that the remaining data was sent
// directly after the initial bytes. If the client disconnects just in this moment we
// will get an exception anyway.
encodedByte = ReadByte(channel, singleByteBuffer, cancellationToken);

value += (byte)(encodedByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128)
multiplier *= 128;
}

return value;
}

private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();

cancellationToken.ThrowIfCancellationRequested();

if (readCount <= 0)
{
ExceptionHelper.ThrowGracefulSocketClose();
}

return singleByteBuffer[0];
}

#else
private static async Task<int> ReadBodyLengthAsync(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var offset = 0;
var multiplier = 128;
var value = initialEncodedByte & 127;
int encodedByte = initialEncodedByte;

while ((encodedByte & 128) != 0)
{
offset++;
if (offset > 3)
{
throw new MqttProtocolViolationException("Remaining length is invalid.");
}

cancellationToken.ThrowIfCancellationRequested();

encodedByte = await ReadByteAsync(channel, singleByteBuffer, cancellationToken).ConfigureAwait(false);

value += (byte)(encodedByte & 127) * multiplier;
multiplier *= 128;
}

return value;
}

private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
private static async Task<byte> ReadByteAsync(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult();
var readCount = await channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

if (readCount <= 0)
{
cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowGracefulSocketClose();
}

return singleByteBuffer[0];
}

#endif
}
}

+ 0
- 2
Source/MQTTnet/Serializer/MqttPacketWriter.cs Просмотреть файл

@@ -29,7 +29,6 @@ namespace MQTTnet.Serializer

public static ArraySegment<byte> EncodeRemainingLength(int length)
{
// write the encoded remaining length right aligned on the 4 byte buffer
if (length <= 0)
{
return new ArraySegment<byte>(new byte[1], 0, 1);
@@ -38,7 +37,6 @@ namespace MQTTnet.Serializer
var buffer = new byte[4];
var bufferOffset = 0;

// Algorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = length;
do
{


+ 8
- 2
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs Просмотреть файл

@@ -70,7 +70,9 @@ namespace MQTTnet.Server
while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D)
// If the client sends 5 sec. the server will allow up to 7.5 seconds.
// If the client sends 1 sec. the server will allow up to 1.5 seconds.
if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D)
{
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId);
_clientSession.Stop(MqttClientDisconnectType.NotClean);
@@ -78,7 +80,11 @@ namespace MQTTnet.Server
return;
}

await Task.Delay(keepAlivePeriod, cancellationToken).ConfigureAwait(false);
// The server checks the keep alive timeout every 50 % of the overall keep alive timeout
// because the server allows 1.5 times the keep alive value. This means that a value of 5 allows
// up to 7.5 seconds. With an interval of 2.5 (5 / 2) the 7.5 is also affected. Waiting the whole
// keep alive time will hit at 10 instead of 7.5 (but only one time instead of two times).
await Task.Delay(TimeSpan.FromSeconds(keepAlivePeriod * 0.5D), cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)


+ 4
- 3
Source/MQTTnet/Server/MqttClientSessionsManager.cs Просмотреть файл

@@ -212,7 +212,7 @@ namespace MQTTnet.Server
// Switch to the required protocol version before sending any response.
clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;

var connectReturnCode = ValidateConnection(connectPacket);
var connectReturnCode = ValidateConnection(connectPacket, clientAdapter);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await clientAdapter.SendPacketAsync(
@@ -268,7 +268,7 @@ namespace MQTTnet.Server
}
}

private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter)
{
if (_options.ConnectionValidator == null)
{
@@ -279,7 +279,8 @@ namespace MQTTnet.Server
connectPacket.ClientId,
connectPacket.Username,
connectPacket.Password,
connectPacket.WillMessage);
connectPacket.WillMessage,
clientAdapter.Endpoint);

_options.ConnectionValidator(context);
return context.ReturnCode;


+ 4
- 1
Source/MQTTnet/Server/MqttConnectionValidatorContext.cs Просмотреть файл

@@ -4,12 +4,13 @@ namespace MQTTnet.Server
{
public class MqttConnectionValidatorContext
{
public MqttConnectionValidatorContext(string clientId, string username, string password, MqttApplicationMessage willMessage)
public MqttConnectionValidatorContext(string clientId, string username, string password, MqttApplicationMessage willMessage, string endpoint)
{
ClientId = clientId;
Username = username;
Password = password;
WillMessage = willMessage;
Endpoint = endpoint;
}

public string ClientId { get; }
@@ -20,6 +21,8 @@ namespace MQTTnet.Server

public MqttApplicationMessage WillMessage { get; }

public string Endpoint { get; }

public MqttConnectReturnCode ReturnCode { get; set; } = MqttConnectReturnCode.ConnectionAccepted;
}
}

+ 1
- 1
Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs Просмотреть файл

@@ -58,7 +58,7 @@ namespace MQTTnet.Benchmarks

var receivedPacket = new ReceivedMqttPacket(
header.Flags,
new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength));
new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength, _serializedPacket.Array.Length));

_serializer.Deserialize(receivedPacket);
}


+ 52
- 4
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs Просмотреть файл

@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -149,6 +150,53 @@ namespace MQTTnet.Core.Tests
DeserializeAndCompare(p, "IAIABQ==", MqttProtocolVersion.V310);
}

[TestMethod]
public void Serialize_LargePacket()
{
var serializer = new MqttPacketSerializer { ProtocolVersion = MqttProtocolVersion.V311 };

const int payloadLength = 80000;

var payload = new byte[payloadLength];

var value = 0;
for (var i = 0; i < payloadLength; i++)
{
if (value > 255)
{
value = 0;
}

payload[i] = (byte)value;
}

var publishPacket = new MqttPublishPacket
{
Topic = "abcdefghijklmnopqrstuvwxyz0123456789",
Payload = payload
};

var buffer = serializer.Serialize(publishPacket);
var testChannel = new TestMqttChannel(new MemoryStream(buffer.Array, buffer.Offset, buffer.Count));

var header = MqttPacketReader.ReadFixedHeaderAsync(
testChannel,
new byte[2],
new byte[1],
CancellationToken.None).GetAwaiter().GetResult();

var eof = buffer.Offset + buffer.Count;

var receivedPacket = new ReceivedMqttPacket(
header.Flags,
new MqttPacketBodyReader(buffer.Array, eof - header.RemainingLength, buffer.Count + buffer.Offset));

var packet = (MqttPublishPacket)serializer.Deserialize(receivedPacket);

Assert.AreEqual(publishPacket.Topic, packet.Topic);
Assert.IsTrue(publishPacket.Payload.SequenceEqual(packet.Payload));
}

[TestMethod]
public void SerializeV311_MqttDisconnectPacket()
{
@@ -463,7 +511,7 @@ namespace MQTTnet.Core.Tests
{
var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion };
var data = serializer.Serialize(packet);
Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(data)));
}

@@ -479,10 +527,10 @@ namespace MQTTnet.Core.Tests
var fixedHeader = new byte[2];
var singleByteBuffer = new byte[1];
var header = MqttPacketReader.ReadFixedHeaderAsync(channel, fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult();
using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength))
{
var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0)));
var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0, (int)bodyStream.Length)));
var buffer2 = serializer.Serialize(deserializedPacket);

Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2)));
@@ -507,7 +555,7 @@ namespace MQTTnet.Core.Tests

using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength))
{
return (T)serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0)));
return (T)serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0, (int)bodyStream.Length)));
}
}
}


+ 4
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs Просмотреть файл

@@ -375,6 +375,9 @@ namespace MQTTnet.Core.Tests
var c1 = await serverAdapter.ConnectTestClient("c1");

await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());

await Task.Delay(250);

await c1.DisconnectAsync();
}
finally
@@ -396,7 +399,7 @@ namespace MQTTnet.Core.Tests
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());

await Task.Delay(500);
await Task.Delay(250);
}
finally
{


+ 7
- 2
Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs Просмотреть файл

@@ -22,13 +22,18 @@ namespace MQTTnet.TestApp.AspNetCore2
.Build();
services
.AddHostedMqttServer(mqttServerOptions)
.AddMqttConnectionHandler();
.AddMqttConnectionHandler()
.AddConnections();
}

// In class _Startup_ of the ASP.NET Core 2.0 project.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseMqttEndpoint();
app.UseConnections(c => c.MapConnectionHandler<MqttConnectionHandler>("/mqtt", options => {
options.WebSockets.SubProtocolSelector = MQTTnet.AspNetCore.ApplicationBuilderExtensions.SelectSubProtocol;
}));

//app.UseMqttEndpoint();
app.UseMqttServer(server =>
{
server.Started += async (sender, args) =>


+ 4
- 3
Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs Просмотреть файл

@@ -3,6 +3,7 @@ using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using Newtonsoft.Json;

@@ -12,7 +13,7 @@ namespace MQTTnet.TestApp.NetCore
{
public static async Task RunAsync()
{
//MqttNetGlobalLogger.LogMessagePublished += (s, e) => Console.WriteLine(e.TraceMessage);
//MqttNetConsoleLogger.ForwardToConsole();

// iot.eclipse.org
await ExecuteTestAsync("iot.eclipse.org TCP",
@@ -35,7 +36,7 @@ namespace MQTTnet.TestApp.NetCore
new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8080/mqtt").Build());

await ExecuteTestAsync("test.mosquitto.org WS TLS",
new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").Build());
new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").WithTls().Build());

// broker.hivemq.com
await ExecuteTestAsync("broker.hivemq.com TCP",
@@ -78,7 +79,7 @@ namespace MQTTnet.TestApp.NetCore
var topic = Guid.NewGuid().ToString();

MqttApplicationMessage receivedMessage = null;
client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage;
client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage;

await client.ConnectAsync(options);
await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj Просмотреть файл

@@ -142,7 +142,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NETCore.UniversalWindowsPlatform">
<Version>6.1.4</Version>
<Version>6.1.5</Version>
</PackageReference>
<PackageReference Include="Microsoft.Toolkit.Uwp.UI.Controls">
<Version>3.0.0</Version>


+ 88
- 39
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml Просмотреть файл

@@ -5,7 +5,6 @@
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:server="using:MQTTnet.Server"
xmlns:interop="using:Windows.UI.Xaml.Interop"
d:DesignHeight="800"
d:DesignWidth="800"
mc:Ignorable="d">
@@ -19,26 +18,55 @@
<Pivot Grid.Row="0">
<PivotItem Header="Connection">
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}">
<TextBlock>Server:</TextBlock>
<TextBox x:Name="Server"></TextBox>
<TextBlock>Port:</TextBlock>
<TextBox x:Name="Port" Text="1883"></TextBox>
<TextBlock>User:</TextBlock>
<TextBox x:Name="User"></TextBox>
<TextBlock>Password:</TextBlock>
<TextBox x:Name="Password"></TextBox>
<TextBlock>ClientId:</TextBlock>
<TextBox x:Name="ClientId"></TextBox>
<TextBlock>Clean session:</TextBlock>
<CheckBox x:Name="CleanSession" IsChecked="True"></CheckBox>
<TextBlock>Keep alive interval:</TextBlock>
<TextBox x:Name="KeepAliveInterval" Text="5"></TextBox>

<StackPanel Orientation="Horizontal">
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton>
<RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton>
<CheckBox x:Name="UseTls">Use TLS</CheckBox>
</StackPanel>
<Grid>
<Grid.Resources>
<Style TargetType="TextBlock">
<Setter Property="VerticalAlignment" Value="Center"></Setter>
<Setter Property="Margin" Value="0, 0, 10, 0"></Setter>
</Style>
</Grid.Resources>
<Grid.ColumnDefinitions>
<ColumnDefinition Width="Auto"></ColumnDefinition>
<ColumnDefinition Width="*"></ColumnDefinition>
</Grid.ColumnDefinitions>
<Grid.RowDefinitions>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
</Grid.RowDefinitions>

<TextBlock Grid.Column="0" Grid.Row="0">Server:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="0" x:Name="Server" Text="localhost"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="1">Port:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="1" x:Name="Port" Text="1883"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="2">Username:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="2" x:Name="User"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="3">Password:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="3" x:Name="Password"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="4">Client ID:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="4" x:Name="ClientId"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="5">Keep Alive interval:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="5" x:Name="KeepAliveInterval" Text="5"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="6">Protocol:</TextBlock>
<StackPanel Grid.Column="1" Grid.Row="6" Orientation="Horizontal">
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton>
<RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton>
<CheckBox x:Name="UseTls">Use TLS</CheckBox>
</StackPanel>
</Grid>
<CheckBox x:Name="CleanSession" IsChecked="True">Clean session</CheckBox>
<CheckBox x:Name="UseManagedClient" IsChecked="False">Use managed client</CheckBox>

<StackPanel Orientation="Horizontal">
<Button Click="Connect" Width="120" Margin="0,0,10,0">Connect</Button>
@@ -49,25 +77,46 @@

<PivotItem Header="Publish">
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}">
<TextBlock>Topic:</TextBlock>
<TextBox x:Name="Topic"></TextBox>
<Grid>
<Grid.Resources>
<Style TargetType="TextBlock">
<Setter Property="VerticalAlignment" Value="Center"></Setter>
<Setter Property="Margin" Value="0, 0, 10, 0"></Setter>
</Style>
</Grid.Resources>
<Grid.ColumnDefinitions>
<ColumnDefinition Width="Auto"></ColumnDefinition>
<ColumnDefinition Width="*"></ColumnDefinition>
</Grid.ColumnDefinitions>
<Grid.RowDefinitions>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
<RowDefinition></RowDefinition>
</Grid.RowDefinitions>

<TextBlock Grid.Column="0" Grid.Row="0">Topic:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="0" x:Name="Topic"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="1">Payload:</TextBlock>
<TextBox Grid.Column="1" Grid.Row="1" x:Name="Payload"></TextBox>

<TextBlock Grid.Column="0" Grid.Row="2">Payload format:</TextBlock>
<StackPanel Grid.Column="1" Grid.Row="2" Orientation="Horizontal">
<RadioButton Margin="0,0,10,0" x:Name="PlainText" IsChecked="True" GroupName="payload">Plain text</RadioButton>
<RadioButton Margin="0,0,10,0" x:Name="Base64" GroupName="payload">Base64 string</RadioButton>
</StackPanel>

<TextBlock Grid.Column="0" Grid.Row="3">QoS level:</TextBlock>
<StackPanel Grid.Column="1" Grid.Row="3" Orientation="Horizontal">
<RadioButton Margin="0,0,10,0" IsChecked="True" GroupName="qos">0 (At most once)</RadioButton>
<RadioButton Margin="0,0,10,0" x:Name="QoS1" GroupName="qos">1 (At least once)</RadioButton>
<RadioButton Margin="0,0,10,0" x:Name="QoS2" GroupName="qos">2 (Exactly once)</RadioButton>
</StackPanel>
</Grid>

<CheckBox x:Name="Retain">Retain</CheckBox>

<TextBlock>Payload:</TextBlock>
<TextBox x:Name="Payload"></TextBox>
<StackPanel Orientation="Horizontal">
<RadioButton x:Name="Text" IsChecked="True" GroupName="payload">Text</RadioButton>
<RadioButton x:Name="Base64" GroupName="payload">Base64</RadioButton>
</StackPanel>

<TextBlock>Retain:</TextBlock>
<CheckBox x:Name="Retain"></CheckBox>

<TextBlock>QoS:</TextBlock>
<StackPanel Orientation="Horizontal">
<RadioButton Margin="0,0,10,0" IsChecked="True" GroupName="qos">0 (At most once)</RadioButton>
<RadioButton Margin="0,0,10,0" x:Name="QoS1" GroupName="qos">1 (At least once)</RadioButton>
<RadioButton Margin="0,0,10,0" x:Name="QoS2" GroupName="qos">2 (Exactly once)</RadioButton>
</StackPanel>
<Button Click="Publish" Width="120">Publish</Button>
</StackPanel>
</PivotItem>


+ 74
- 26
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs Просмотреть файл

@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Windows.Security.Cryptography.Certificates;
@@ -25,12 +26,15 @@ namespace MQTTnet.TestApp.UniversalWindows
private readonly ObservableCollection<IMqttClientSessionStatus> _sessions = new ObservableCollection<IMqttClientSessionStatus>();

private IMqttClient _mqttClient;
private IManagedMqttClient _managedMqttClient;
private IMqttServer _mqttServer;

public MainPage()
{
InitializeComponent();

ClientId.Text = Guid.NewGuid().ToString("D");

MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
}

@@ -81,7 +85,10 @@ namespace MQTTnet.TestApp.UniversalWindows
AllowUntrustedCertificates = true
};

var options = new MqttClientOptions { ClientId = ClientId.Text };
var options = new MqttClientOptions
{
ClientId = ClientId.Text
};

if (UseTcp.IsChecked == true)
{
@@ -127,12 +134,28 @@ namespace MQTTnet.TestApp.UniversalWindows
}

var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;

await _mqttClient.ConnectAsync(options);
if (UseManagedClient.IsChecked == true)
{
_managedMqttClient = factory.CreateManagedMqttClient();
_managedMqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_managedMqttClient.Connected += OnConnected;
_managedMqttClient.Disconnected += OnDisconnected;

await _managedMqttClient.StartAsync(new ManagedMqttClientOptions
{
ClientOptions = options
});
}
else
{
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;

await _mqttClient.ConnectAsync(options);
}
}
catch (Exception exception)
{
@@ -171,11 +194,6 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Publish(object sender, RoutedEventArgs e)
{
if (_mqttClient == null)
{
return;
}

try
{
var qos = MqttQualityOfServiceLevel.AtMostOnce;
@@ -190,7 +208,7 @@ namespace MQTTnet.TestApp.UniversalWindows
}

var payload = new byte[0];
if (Text.IsChecked == true)
if (PlainText.IsChecked == true)
{
payload = Encoding.UTF8.GetBytes(Payload.Text);
}
@@ -207,7 +225,15 @@ namespace MQTTnet.TestApp.UniversalWindows
.WithRetainFlag(Retain.IsChecked == true)
.Build();

await _mqttClient.PublishAsync(message);
if (_mqttClient != null)
{
await _mqttClient.PublishAsync(message);
}

if (_managedMqttClient != null)
{
await _managedMqttClient.PublishAsync(message);
}
}
catch (Exception exception)
{
@@ -219,7 +245,19 @@ namespace MQTTnet.TestApp.UniversalWindows
{
try
{
await _mqttClient.DisconnectAsync();
if (_mqttClient != null)
{
await _mqttClient.DisconnectAsync();
_mqttClient.Dispose();
_mqttClient = null;
}
if (_managedMqttClient != null)
{
await _managedMqttClient.StopAsync();
_managedMqttClient.Dispose();
_managedMqttClient = null;
}
}
catch (Exception exception)
{
@@ -239,11 +277,6 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Subscribe(object sender, RoutedEventArgs e)
{
if (_mqttClient == null)
{
return;
}

try
{
var qos = MqttQualityOfServiceLevel.AtMostOnce;
@@ -257,7 +290,15 @@ namespace MQTTnet.TestApp.UniversalWindows
qos = MqttQualityOfServiceLevel.ExactlyOnce;
}

await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
if (_mqttClient != null)
{
await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
}

if (_managedMqttClient != null)
{
await _managedMqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos));
}
}
catch (Exception exception)
{
@@ -267,14 +308,17 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Unsubscribe(object sender, RoutedEventArgs e)
{
if (_mqttClient == null)
{
return;
}

try
{
await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
if (_mqttClient != null)
{
await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text);
}

if (_managedMqttClient != null)
{
await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text);
}
}
catch (Exception exception)
{
@@ -398,6 +442,8 @@ namespace MQTTnet.TestApp.UniversalWindows
ListViewSessions.DataContext = _sessions;
}

#region Wiki Code

private async Task WikiCode()
{
{
@@ -632,5 +678,7 @@ namespace MQTTnet.TestApp.UniversalWindows
await mqttClient.StartAsync(options);
}
}

#endregion
}
}

Загрузка…
Отмена
Сохранить