Browse Source

General code refactoring.

release/3.x.x
Christian Kratky 4 years ago
parent
commit
dbe12b9c48
27 changed files with 191 additions and 118 deletions
  1. +3
    -3
      Build/MQTTnet.nuspec
  2. +1
    -1
      Source/MQTTnet.AspnetCore/Extensions/ApplicationBuilderExtensions.cs
  3. +1
    -1
      Source/MQTTnet.AspnetCore/Extensions/ConnectionBuilderExtensions.cs
  4. +4
    -1
      Source/MQTTnet.AspnetCore/Extensions/ConnectionRouteBuilderExtensions.cs
  5. +1
    -1
      Source/MQTTnet.AspnetCore/Extensions/ReaderExtensions.cs
  6. +3
    -3
      Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs
  7. +1
    -0
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  8. +12
    -12
      Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs
  9. +5
    -11
      Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs
  10. +2
    -3
      Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs
  11. +12
    -0
      Source/MQTTnet.Server/.config/dotnet-tools.json
  12. +6
    -3
      Source/MQTTnet.Server/MQTTnet.Server.csproj
  13. +27
    -4
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  14. +12
    -53
      Source/MQTTnet/Implementations/CrossPlatformSocket.cs
  15. +6
    -6
      Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs
  16. +4
    -6
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  17. +1
    -1
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  18. +2
    -2
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  19. +1
    -1
      Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
  20. +5
    -2
      Source/MQTTnet/Server/IMqttServerCertificateCredentials.cs
  21. +4
    -2
      Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj
  22. +1
    -0
      Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs
  23. +2
    -2
      Tests/MQTTnet.AspNetCore.Tests/ReaderExtensionsTest.cs
  24. +1
    -0
      Tests/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs
  25. +72
    -0
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  26. +1
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/Program.cs
  27. +1
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs

+ 3
- 3
Build/MQTTnet.nuspec View File

@@ -12,9 +12,9 @@
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [Client] Fixed issue in keep alive handling.
* [MQTTnet.Server] Added support for delivering static files (HTML, JavaScript etc.).
* [MQTTnet.Server] Fixed web socket protocol errors (when using paho JS etc.).
* [RcpClient] Adjusted some namespaces (BREAKING CHANGE!)
* [AspNetCore] Adjusted some namespaces (BREAKING CHANGE!)
* [Server] Adjusted some namespaces (BREAKING CHANGE!)
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 1
- 1
Source/MQTTnet.AspnetCore/Extensions/ApplicationBuilderExtensions.cs View File

@@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Server;

namespace MQTTnet.AspNetCore
namespace MQTTnet.AspNetCore.Extensions
{
public static class ApplicationBuilderExtensions
{


+ 1
- 1
Source/MQTTnet.AspnetCore/Extensions/ConnectionBuilderExtensions.cs View File

@@ -1,6 +1,6 @@
using Microsoft.AspNetCore.Connections;

namespace MQTTnet.AspNetCore
namespace MQTTnet.AspNetCore.Extensions
{
public static class ConnectionBuilderExtensions
{


+ 4
- 1
Source/MQTTnet.AspnetCore/Extensions/ConnectionRouteBuilderExtensions.cs View File

@@ -1,8 +1,11 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Connections;

#if NETCOREAPP3_1
using System;
#endif

namespace MQTTnet.AspNetCore
namespace MQTTnet.AspNetCore.Extensions
{
public static class ConnectionRouteBuilderExtensions
{


+ 1
- 1
Source/MQTTnet.AspnetCore/Extensions/ReaderExtensions.cs View File

@@ -5,7 +5,7 @@ using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Packets;

namespace MQTTnet.AspNetCore
namespace MQTTnet.AspNetCore.Extensions
{
public static class ReaderExtensions
{


+ 3
- 3
Source/MQTTnet.AspnetCore/Extensions/ServiceCollectionExtensions.cs View File

@@ -1,12 +1,12 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Server;
using System;

namespace MQTTnet.AspNetCore
namespace MQTTnet.AspNetCore.Extensions
{
public static class ServiceCollectionExtensions
{


+ 1
- 0
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -10,6 +10,7 @@ using System.IO.Pipelines;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.AspNetCore.Extensions;

namespace MQTTnet.AspNetCore
{


+ 12
- 12
Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs View File

@@ -8,9 +8,9 @@ namespace MQTTnet.AspNetCore
{
public class SpanBasedMqttPacketBodyReader : IMqttPacketBodyReader
{
private ReadOnlyMemory<byte> _buffer;
ReadOnlyMemory<byte> _buffer;

private int _offset;
int _offset;
public void SetBuffer(ReadOnlyMemory<byte> buffer)
{
@@ -38,16 +38,6 @@ namespace MQTTnet.AspNetCore
{
return ReadSegmentWithLengthPrefix().ToArray();
}

private ReadOnlySpan<byte> ReadSegmentWithLengthPrefix()
{
var span = _buffer.Span;
var length = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(_offset));

var result = span.Slice(_offset+2, length);
_offset += 2 + length;
return result;
}
public unsafe string ReadStringWithLengthPrefix()
{
@@ -121,5 +111,15 @@ namespace MQTTnet.AspNetCore
{
_offset = position;
}

ReadOnlySpan<byte> ReadSegmentWithLengthPrefix()
{
var span = _buffer.Span;
var length = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(_offset));

var result = span.Slice(_offset + 2, length);
_offset += 2 + length;
return result;
}
}
}

+ 5
- 11
Source/MQTTnet.AspnetCore/SpanBasedMqttPacketWriter.cs View File

@@ -8,16 +8,10 @@ namespace MQTTnet.AspNetCore
{
public class SpanBasedMqttPacketWriter : IMqttPacketWriter
{
private readonly ArrayPool<byte> _pool;
readonly ArrayPool<byte> _pool = ArrayPool<byte>.Create();

public SpanBasedMqttPacketWriter()
{
_pool = ArrayPool<byte>.Create();
}

private byte[] _buffer;
private int _position;
byte[] _buffer;
int _position;

public int Length { get; set; }

@@ -112,7 +106,7 @@ namespace MQTTnet.AspNetCore
Commit(payload.Length);
}

private void Commit(int count)
void Commit(int count)
{
if (_position == Length)
{
@@ -122,7 +116,7 @@ namespace MQTTnet.AspNetCore
_position += count;
}

private void GrowIfNeeded(int requiredAdditional)
void GrowIfNeeded(int requiredAdditional)
{
var requiredTotal = _position + requiredAdditional;
if (_buffer.Length >= requiredTotal)


+ 2
- 3
Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs View File

@@ -1,7 +1,6 @@
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
using System;
using System;

namespace MQTTnet.Extensions.Rpc.Options
namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration
{
public class DefaultMqttRpcClientTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy
{


+ 12
- 0
Source/MQTTnet.Server/.config/dotnet-tools.json View File

@@ -0,0 +1,12 @@
{
"version": 1,
"isRoot": true,
"tools": {
"dotnet-ef": {
"version": "3.1.4",
"commands": [
"dotnet-ef"
]
}
}
}

+ 6
- 3
Source/MQTTnet.Server/MQTTnet.Server.csproj View File

@@ -8,19 +8,22 @@
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<Company />
<Product>MQTTnet</Product>
<Description />
<Authors />
<Description>Standalone MQTT server with Python scripting backend.</Description>
<Authors>The authors of MQTTnet.Server.</Authors>
<PackageId />
<SignAssembly>false</SignAssembly>
<DelaySign>false</DelaySign>
<LangVersion>latest</LangVersion>
<Version>1.0.0</Version>
<Version>3.0.11</Version>
<TypeScriptToolsVersion>3.3</TypeScriptToolsVersion>
</PropertyGroup>

<PropertyGroup>
<StartupObject>MQTTnet.Server.Program</StartupObject>
<UserSecretsId>c564f0de-28b4-45bf-b726-4d665d705653</UserSecretsId>
<Copyright>(c) Christian Kratky 2016-2020</Copyright>
<PackageProjectUrl>https://github.com/chkr1011/MQTTnet</PackageProjectUrl>
<RepositoryUrl>https://github.com/chkr1011/MQTTnet</RepositoryUrl>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">


+ 27
- 4
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -59,6 +59,7 @@ namespace MQTTnet.Adapter
public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
cancellationToken.ThrowIfCancellationRequested();

try
{
@@ -85,6 +86,7 @@ namespace MQTTnet.Adapter
public async Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
cancellationToken.ThrowIfCancellationRequested();

try
{
@@ -112,8 +114,17 @@ namespace MQTTnet.Adapter
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
cancellationToken.ThrowIfCancellationRequested();

try
{
await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
throw new OperationCanceledException();
}

await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var packetData = PacketFormatterAdapter.Encode(packet);
@@ -144,13 +155,22 @@ namespace MQTTnet.Adapter
finally
{
PacketFormatterAdapter.FreeBuffer();
_writerSemaphore?.Release();

try
{
_writerSemaphore.Release();
}
catch (ObjectDisposedException)
{
throw new OperationCanceledException();
}
}
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
cancellationToken.ThrowIfCancellationRequested();

try
{
@@ -189,6 +209,9 @@ namespace MQTTnet.Adapter
catch (OperationCanceledException)
{
}
catch (ObjectDisposedException)
{
}
catch (Exception exception)
{
if (IsWrappedException(exception))
@@ -212,8 +235,8 @@ namespace MQTTnet.Adapter
{
if (disposing)
{
_channel?.Dispose();
_writerSemaphore?.Dispose();
_channel.Dispose();
_writerSemaphore.Dispose();
}

base.Dispose(disposing);


+ 12
- 53
Source/MQTTnet/Implementations/CrossPlatformSocket.cs View File

@@ -20,7 +20,7 @@ namespace MQTTnet.Implementations

public CrossPlatformSocket()
{
// Having this contructor is important because avoiding the address family as parameter
// Having this constructor is important because avoiding the address family as parameter
// will make use of dual mode in the .net framework.
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}
@@ -33,75 +33,34 @@ namespace MQTTnet.Implementations

public bool NoDelay
{
get
{
return (int)_socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay) > 0;
}

set
{
_socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, value ? 1 : 0);
}
get => (int)_socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay) > 0;
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, value ? 1 : 0);
}

public bool DualMode
{
get
{
return _socket.DualMode;
}

set
{
_socket.DualMode = value;
}
get => _socket.DualMode;
set => _socket.DualMode = value;
}

public int ReceiveBufferSize
{
get
{
return _socket.ReceiveBufferSize;
}

set
{
_socket.ReceiveBufferSize = value;
}
get => _socket.ReceiveBufferSize;
set => _socket.ReceiveBufferSize = value;
}

public int SendBufferSize
{
get
{
return _socket.SendBufferSize;
}

set
{
_socket.SendBufferSize = value;
}
get => _socket.SendBufferSize;
set => _socket.SendBufferSize = value;
}

public EndPoint RemoteEndPoint
{
get
{
return _socket.RemoteEndPoint;
}
}
public EndPoint RemoteEndPoint => _socket.RemoteEndPoint;

public bool ReuseAddress
{
get
{
return (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress) != 0;
}

set
{
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value ? 1 : 0);
}
get => (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress) != 0;
set => _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value ? 1 : 0);
}

public async Task<CrossPlatformSocket> AcceptAsync()


+ 6
- 6
Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs View File

@@ -17,14 +17,14 @@ using MQTTnet.Server;

namespace MQTTnet.Implementations
{
public class MqttTcpChannel : IMqttChannel
public sealed class MqttTcpChannel : IMqttChannel
{
private readonly MqttClientTcpOptions _options;
private readonly int _bufferSize;
readonly MqttClientTcpOptions _options;
readonly int _bufferSize;

private StreamSocket _socket;
private Stream _readStream;
private Stream _writeStream;
StreamSocket _socket;
Stream _readStream;
Stream _writeStream;

public MqttTcpChannel(IMqttClientOptions clientOptions)
{


+ 4
- 6
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -13,7 +13,7 @@ using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IDisposable, IMqttChannel
public sealed class MqttTcpChannel : IMqttChannel
{
readonly IMqttClientOptions _clientOptions;
readonly MqttClientTcpOptions _options;
@@ -162,7 +162,6 @@ namespace MQTTnet.Implementations
}
catch (ObjectDisposedException)
{
return;
}
catch (IOException exception)
{
@@ -205,7 +204,6 @@ namespace MQTTnet.Implementations
{
return certificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions);
}

#endregion

var certificateValidationHandler = _options?.TlsOptions?.CertificateValidationHandler;
@@ -229,7 +227,7 @@ namespace MQTTnet.Implementations

if (chain.ChainStatus.Any(c => c.Status == X509ChainStatusFlags.RevocationStatusUnknown || c.Status == X509ChainStatusFlags.Revoked || c.Status == X509ChainStatusFlags.OfflineRevocation))
{
if (!_options.TlsOptions.IgnoreCertificateRevocationErrors)
if (_options?.TlsOptions?.IgnoreCertificateRevocationErrors != true)
{
return false;
}
@@ -237,13 +235,13 @@ namespace MQTTnet.Implementations

if (chain.ChainStatus.Any(c => c.Status == X509ChainStatusFlags.PartialChain))
{
if (!_options.TlsOptions.IgnoreCertificateChainErrors)
if (_options?.TlsOptions?.IgnoreCertificateChainErrors != true)
{
return false;
}
}

return _options.TlsOptions.AllowUntrustedCertificates;
return _options?.TlsOptions?.AllowUntrustedCertificates == true;
}

X509CertificateCollection LoadCertificates()


+ 1
- 1
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -130,7 +130,7 @@ namespace MQTTnet.Implementations
}
}

private Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter)
Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter)
{
var clientHandler = ClientHandler;
if (clientHandler == null)


+ 2
- 2
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -24,8 +24,8 @@ namespace MQTTnet.Implementations
readonly MqttServerTlsTcpEndpointOptions _tlsOptions;
readonly X509Certificate2 _tlsCertificate;

private CrossPlatformSocket _socket;
private IPEndPoint _localEndPoint;
CrossPlatformSocket _socket;
IPEndPoint _localEndPoint;

public MqttTcpServerListener(
AddressFamily addressFamily,


+ 1
- 1
Source/MQTTnet/Implementations/MqttWebSocketChannel.cs View File

@@ -35,7 +35,7 @@ namespace MQTTnet.Implementations

public bool IsSecureConnection { get; private set; }

public X509Certificate2 ClientCertificate { get; private set; }
public X509Certificate2 ClientCertificate { get; }

public async Task ConnectAsync(CancellationToken cancellationToken)
{


+ 5
- 2
Source/MQTTnet/Server/IMqttServerCertificateCredentials.cs View File

@@ -1,4 +1,7 @@
public interface IMqttServerCertificateCredentials
namespace MQTTnet.Server
{
string Password { get; }
public interface IMqttServerCertificateCredentials
{
string Password { get; }
}
}

+ 4
- 2
Tests/MQTTnet.AspNetCore.Tests/MQTTnet.AspNetCore.Tests.csproj View File

@@ -6,13 +6,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.2.7" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.0" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.1" />
<PackageReference Include="MSTest.TestFramework" Version="2.1.1" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp3.1' ">
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.2.7" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.2.0" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp3.1' ">
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.1.1" />


+ 1
- 0
Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs View File

@@ -15,6 +15,7 @@ using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using System.Net;
using MQTTnet.AspNetCore.Extensions;

namespace MQTTnet.AspNetCore.Tests
{


+ 2
- 2
Tests/MQTTnet.AspNetCore.Tests/ReaderExtensionsTest.cs View File

@@ -1,6 +1,7 @@
#if NETCOREAPP
#if NETCOREAPP3_1
using System.Buffers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.AspNetCore.Extensions;
using MQTTnet.Formatter;
using MQTTnet.Packets;

@@ -31,7 +32,6 @@ namespace MQTTnet.AspNetCore.Tests
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed, out read);
Assert.IsFalse(result);


part = sequence.Slice(sequence.Start, 1); // partial fixed header should fail
result = serializer.TryDecode(reader, part, out packet, out consumed, out observed, out read);
Assert.IsFalse(result);


+ 1
- 0
Tests/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs View File

@@ -9,6 +9,7 @@ using Microsoft.AspNetCore.Hosting;
using MQTTnet.Server;
using MQTTnet.Diagnostics;
using MQTTnet.AspNetCore.Client;
using MQTTnet.AspNetCore.Extensions;
using MQTTnet.Client.Options;

namespace MQTTnet.Benchmarks


+ 72
- 0
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -274,6 +274,78 @@ namespace MQTTnet.Tests
}
}

[TestMethod]
public async Task Subscribe_Lots_In_Single_Request()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

await testEnvironment.StartServerAsync();

var c1 = await testEnvironment.ConnectClientAsync();
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));

var optionsBuilder = new MqttClientSubscribeOptionsBuilder();
for (var i = 0; i < 1000; i++)
{
optionsBuilder.WithTopicFilter(i.ToString(), MqttQualityOfServiceLevel.AtMostOnce);
}

await c1.SubscribeAsync(optionsBuilder.Build()).ConfigureAwait(false);

var c2 = await testEnvironment.ConnectClientAsync();

var messageBuilder = new MqttApplicationMessageBuilder();
for (var i = 0; i < 1000; i++)
{
messageBuilder.WithTopic(i.ToString());

await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
}

await Task.Delay(500).ConfigureAwait(false);

Assert.AreEqual(1000, receivedMessagesCount);
}
}

[TestMethod]
public async Task Subscribe_Lots_In_Multiple_Requests()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

await testEnvironment.StartServerAsync();

var c1 = await testEnvironment.ConnectClientAsync();
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));

for (var i = 0; i < 1000; i++)
{
var so = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(i.ToString(), MqttQualityOfServiceLevel.AtMostOnce).Build();

await c1.SubscribeAsync(so).ConfigureAwait(false);
}
var c2 = await testEnvironment.ConnectClientAsync();

var messageBuilder = new MqttApplicationMessageBuilder();
for (var i = 0; i < 1000; i++)
{
messageBuilder.WithTopic(i.ToString());

await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
}

await Task.Delay(500).ConfigureAwait(false);

Assert.AreEqual(1000, receivedMessagesCount);
}
}

[TestMethod]
public async Task Subscribe_Multiple_In_Multiple_Request()
{


+ 1
- 0
Tests/MQTTnet.TestApp.AspNetCore2/Program.cs View File

@@ -2,6 +2,7 @@
using Microsoft.AspNetCore.Hosting;
using MQTTnet.AspNetCore;
using System.Threading.Tasks;
using MQTTnet.AspNetCore.Extensions;

namespace MQTTnet.TestApp.AspNetCore2
{


+ 1
- 0
Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs View File

@@ -9,6 +9,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Logging;
using MQTTnet.AspNetCore;
using MQTTnet.AspNetCore.Extensions;
using MQTTnet.Server;

namespace MQTTnet.TestApp.AspNetCore2


Loading…
Cancel
Save