Bläddra i källkod

Refactor options

release/3.x.x
Christian Kratky 7 år sedan
förälder
incheckning
67267c05fd
40 ändrade filer med 292 tillägg och 198 borttagningar
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +27
    -0
      Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs
  3. +8
    -8
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  4. +12
    -10
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  5. +1
    -0
      Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
  6. +1
    -18
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  7. +27
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
  8. +8
    -7
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  9. +9
    -8
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  10. +1
    -18
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  11. +27
    -0
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
  12. +9
    -9
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  13. +9
    -8
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  14. +1
    -0
      Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
  15. +1
    -18
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  16. +1
    -2
      MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
  17. +2
    -2
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  18. +1
    -2
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  19. +1
    -1
      MQTTnet.Core/Client/IMqttClient.cs
  20. +9
    -0
      MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs
  21. +13
    -10
      MQTTnet.Core/Client/MqttClient.cs
  22. +3
    -7
      MQTTnet.Core/Client/MqttClientOptions.cs
  23. +9
    -0
      MQTTnet.Core/Client/MqttClientTcpOptions.cs
  24. +2
    -2
      MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs
  25. +7
    -0
      MQTTnet.Core/Client/MqttClientWebSocketOptions.cs
  26. +0
    -8
      MQTTnet.Core/Client/MqttConnectionType.cs
  27. +3
    -0
      MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
  28. +20
    -18
      MQTTnet.Core/Server/MqttClientSession.cs
  29. +2
    -3
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  30. +1
    -1
      MQTTnet.sln
  31. +1
    -1
      README.md
  32. +20
    -0
      Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs
  33. +1
    -2
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  34. +0
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  35. +1
    -2
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  36. +7
    -7
      Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
  37. +5
    -6
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  38. +3
    -3
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs
  39. +11
    -6
      Tests/MQTTnet.TestApp.NetFramework/Program.cs
  40. +27
    -10
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 1
- 0
Build/MQTTnet.nuspec Visa fil

@@ -16,6 +16,7 @@
* [Server] The client connection is now closed if sending of one pending application message has failed
* [Server] Fixed handling of _Dup_ flag (Thanks to haeberle)
* [Core] Optimized exception handling
* [Core] Mono is now also supported (Thanks to JTrotta)
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M</tags>


+ 27
- 0
Frameworks/MQTTnet.NetFramework/Implementations/MqttCommunicationAdapterFactory.cs Visa fil

@@ -0,0 +1,27 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (options is MqttClientTcpOptions tcpOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientWebSocketOptions webSocketOptions)
{
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}
}

+ 8
- 8
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs Visa fil

@@ -12,6 +12,8 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientTcpOptions _options;

// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.
@@ -22,9 +24,9 @@ namespace MQTTnet.Implementations
/// <summary>
/// called on client sockets are created in connect
/// </summary>
public MqttTcpChannel()
public MqttTcpChannel(MqttClientTcpOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

/// <summary>
@@ -42,22 +44,20 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false);

if (options.TlsOptions.UseTls)
if (_options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));

await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}

CreateStreams(_socket, _sslStream);


+ 12
- 10
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs Visa fil

@@ -10,27 +10,29 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientWebSocketOptions _options;

private ClientWebSocket _webSocket;

public MqttWebSocketChannel(MqttClientWebSocketOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public Stream RawReceiveStream { get; private set; }
public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;

public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
var uri = options.Server;
var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}

if (options.Port.HasValue)
{
uri += ":" + options.Port;
}
_webSocket = new ClientWebSocket();
_webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
_webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false);
RawReceiveStream = new WebSocketStream(_webSocket);
}


+ 1
- 0
Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj Visa fil

@@ -100,6 +100,7 @@
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="Implementations\MqttCommunicationAdapterFactory.cs" />
<Compile Include="Implementations\MqttWebSocketChannel.cs" />
<Compile Include="Implementations\WebSocketStream.cs" />
<Compile Include="MqttClientFactory.cs" />


+ 1
- 18
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs Visa fil

@@ -1,8 +1,5 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;

namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case MqttConnectionType.Tcp:
return new MqttTcpChannel();
case MqttConnectionType.Ws:
return new MqttWebSocketChannel();

default:
throw new NotSupportedException();
}
return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}

+ 27
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs Visa fil

@@ -0,0 +1,27 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (options is MqttClientTcpOptions tcpOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientWebSocketOptions webSocketOptions)
{
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}
}

+ 8
- 7
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs Visa fil

@@ -12,14 +12,17 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientTcpOptions _options;

private Socket _socket;
private SslStream _sslStream;
/// <summary>
/// called on client sockets are created in connect
/// </summary>
public MqttTcpChannel()
public MqttTcpChannel(MqttClientTcpOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

/// <summary>
@@ -37,22 +40,20 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream => ReceiveStream;
public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (_socket == null)
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false);
await _socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false);

if (options.TlsOptions.UseTls)
if (_options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true));
ReceiveStream = _sslStream;
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
}
else
{


+ 9
- 8
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs Visa fil

@@ -10,27 +10,28 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientWebSocketOptions _options;
private ClientWebSocket _webSocket;

public MqttWebSocketChannel(MqttClientWebSocketOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
public Stream RawReceiveStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
var uri = options.Server;
var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}

if (options.Port.HasValue)
{
uri += ":" + options.Port;
}
_webSocket = new ClientWebSocket();
_webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
_webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None);
RawReceiveStream = new WebSocketStream(_webSocket);
}


+ 1
- 18
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs Visa fil

@@ -1,8 +1,5 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;

namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case MqttConnectionType.Tcp:
return new MqttTcpChannel();
case MqttConnectionType.Ws:
return new MqttWebSocketChannel();

default:
throw new NotSupportedException();
}
return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}

+ 27
- 0
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs Visa fil

@@ -0,0 +1,27 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (options is MqttClientTcpOptions tcpOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientWebSocketOptions webSocketOptions)
{
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}
}

+ 9
- 9
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs Visa fil

@@ -13,10 +13,12 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientTcpOptions _options;
private StreamSocket _socket;

public MqttTcpChannel()
public MqttTcpChannel(MqttClientTcpOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public MqttTcpChannel(StreamSocket socket)
@@ -29,30 +31,28 @@ namespace MQTTnet.Implementations
public Stream ReceiveStream { get; private set; }
public Stream RawReceiveStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (_socket == null)
{
_socket = new StreamSocket();
}

if (!options.TlsOptions.UseTls)
if (!_options.TlsOptions.UseTls)
{
await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString());
await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString());
}
else
{
_socket.Control.ClientCertificate = LoadCertificate(options);
_socket.Control.ClientCertificate = LoadCertificate(_options);

if (!options.TlsOptions.CheckCertificateRevocation)
if (!_options.TlsOptions.CheckCertificateRevocation)
{
_socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.IncompleteChain);
_socket.Control.IgnorableServerCertificateErrors.Add(ChainValidationResult.RevocationInformationMissing);
}

await _socket.ConnectAsync(new HostName(options.Server), options.GetPort().ToString(), SocketProtectionLevel.Tls12);
await _socket.ConnectAsync(new HostName(_options.Server), _options.GetPort().ToString(), SocketProtectionLevel.Tls12);
}

CreateStreams();


+ 9
- 8
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs Visa fil

@@ -10,27 +10,28 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private readonly MqttClientWebSocketOptions _options;
private ClientWebSocket _webSocket = new ClientWebSocket();

public MqttWebSocketChannel(MqttClientWebSocketOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public Stream RawReceiveStream { get; private set; }
public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;

public async Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync()
{
var uri = options.Server;
var uri = _options.Uri;
if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
{
uri = "ws://" + uri;
}

if (options.Port.HasValue)
{
uri += ":" + options.Port;
}
_webSocket = new ClientWebSocket();
_webSocket.Options.KeepAliveInterval = options.KeepAlivePeriod;
_webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false);

RawReceiveStream = new WebSocketStream(_webSocket);


+ 1
- 0
Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj Visa fil

@@ -110,6 +110,7 @@
<DocumentationFile>bin\x64\Release\MQTTnet.XML</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Compile Include="Implementations\MqttCommunicationAdapterFactory.cs" />
<Compile Include="Implementations\MqttWebSocketChannel.cs" />
<Compile Include="MqttClientFactory.cs" />
<Compile Include="Implementations\MqttServerAdapter.cs" />


+ 1
- 18
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs Visa fil

@@ -1,8 +1,5 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;

namespace MQTTnet
@@ -13,21 +10,7 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case MqttConnectionType.Tcp:
return new MqttTcpChannel();
case MqttConnectionType.Ws:
return new MqttWebSocketChannel();

default:
throw new NotSupportedException();
}
return new MqttClient(new MqttCommunicationAdapterFactory());
}
}
}

+ 1
- 2
MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs Visa fil

@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;

@@ -12,7 +11,7 @@ namespace MQTTnet.Core.Adapter
{
IMqttPacketSerializer PacketSerializer { get; }

Task ConnectAsync(TimeSpan timeout, MqttClientOptions options);
Task ConnectAsync(TimeSpan timeout);

Task DisconnectAsync(TimeSpan timeout);



+ 2
- 2
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs Visa fil

@@ -26,11 +26,11 @@ namespace MQTTnet.Core.Adapter

public IMqttPacketSerializer PacketSerializer { get; }

public async Task ConnectAsync(TimeSpan timeout, MqttClientOptions options)
public async Task ConnectAsync(TimeSpan timeout)
{
try
{
await _channel.ConnectAsync(options).TimeoutAfter(timeout).ConfigureAwait(false);
await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (TaskCanceledException)
{


+ 1
- 2
MQTTnet.Core/Channel/IMqttCommunicationChannel.cs Visa fil

@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using System.IO;

namespace MQTTnet.Core.Channel
@@ -10,7 +9,7 @@ namespace MQTTnet.Core.Channel
Stream ReceiveStream { get; }
Stream RawReceiveStream { get; }

Task ConnectAsync(MqttClientOptions options);
Task ConnectAsync();
Task DisconnectAsync();
}
}

+ 1
- 1
MQTTnet.Core/Client/IMqttClient.cs Visa fil

@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client
event EventHandler Connected;
event EventHandler Disconnected;

Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null);
Task ConnectAsync(MqttClientOptions options);
Task DisconnectAsync();

Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters);


+ 9
- 0
MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs Visa fil

@@ -0,0 +1,9 @@
using MQTTnet.Core.Adapter;

namespace MQTTnet.Core.Client
{
public interface IMqttCommunicationAdapterFactory
{
IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options);
}
}

+ 13
- 10
MQTTnet.Core/Client/MqttClient.cs Visa fil

@@ -16,19 +16,17 @@ namespace MQTTnet.Core.Client
{
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly MqttClientOptions _options;
private readonly IMqttCommunicationAdapter _adapter;
private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory;

private MqttClientOptions _options;
private bool _isReceivingPackets;
private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;

public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter)
public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));

_adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion;
_communicationChannelFactory = communicationChannelFactory;
}

public event EventHandler Connected;
@@ -37,22 +35,27 @@ namespace MQTTnet.Core.Client

public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;

public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null)
public async Task ConnectAsync(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");

try
{
_options = options;
_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_packetDispatcher.Reset();

_adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options);
MqttTrace.Verbose(nameof(MqttClient), "Trying to connect with server.");
await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false);
await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");

await SetupIncomingPacketProcessingAsync();
await AuthenticateAsync(willApplicationMessage);
await AuthenticateAsync(options.WillMessage);

MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");



+ 3
- 7
MQTTnet.Core/Client/MqttClientOptions.cs Visa fil

@@ -3,14 +3,12 @@ using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Client
{
public sealed class MqttClientOptions
public abstract class MqttClientOptions
{
public string Server { get; set; }

public int? Port { get; set; }

public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();

public MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }

public string Password { get; set; }
@@ -24,7 +22,5 @@ namespace MQTTnet.Core.Client
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp;
}
}

+ 9
- 0
MQTTnet.Core/Client/MqttClientTcpOptions.cs Visa fil

@@ -0,0 +1,9 @@
namespace MQTTnet.Core.Client
{
public class MqttClientTcpOptions : MqttClientOptions
{
public string Server { get; set; }

public int? Port { get; set; }
}
}

MQTTnet.Core/Client/MqttClientOptionsExtensions.cs → MQTTnet.Core/Client/MqttClientTcpOptionsExtensions.cs Visa fil

@@ -2,9 +2,9 @@

namespace MQTTnet.Core.Client
{
public static class MqttClientOptionsExtensions
public static class MqttClientTcpOptionsExtensions
{
public static int GetPort(this MqttClientOptions options)
public static int GetPort(this MqttClientTcpOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));


+ 7
- 0
MQTTnet.Core/Client/MqttClientWebSocketOptions.cs Visa fil

@@ -0,0 +1,7 @@
namespace MQTTnet.Core.Client
{
public class MqttClientWebSocketOptions : MqttClientOptions
{
public string Uri { get; set; }
}
}

+ 0
- 8
MQTTnet.Core/Client/MqttConnectionType.cs Visa fil

@@ -1,8 +0,0 @@
namespace MQTTnet.Core.Client
{
public enum MqttConnectionType
{
Tcp,
Ws
}
}

+ 3
- 0
MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs Visa fil

@@ -72,6 +72,9 @@ namespace MQTTnet.Core.Server
{
MqttTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception.");
}
if (exception is OperationCanceledException)
{
}
else
{
MqttTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed.");


+ 20
- 18
MQTTnet.Core/Server/MqttClientSession.cs Visa fil

@@ -8,6 +8,7 @@ using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Server
{
@@ -19,7 +20,8 @@ namespace MQTTnet.Core.Server
private readonly MqttClientSessionsManager _mqttClientSessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options;

private IMqttCommunicationAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;

@@ -33,9 +35,9 @@ namespace MQTTnet.Core.Server

public string ClientId { get; }

public bool IsConnected => Adapter != null;
public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;

public IMqttCommunicationAdapter Adapter { get; private set; }
public bool IsConnected => _adapter != null;

public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
{
@@ -45,7 +47,7 @@ namespace MQTTnet.Core.Server

try
{
Adapter = adapter;
_adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource();

_pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
@@ -75,7 +77,7 @@ namespace MQTTnet.Core.Server
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

Adapter = null;
_adapter = null;

MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
}
@@ -106,7 +108,7 @@ namespace MQTTnet.Core.Server
while (!cancellationToken.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
await ProcessReceivedPacketAsync(packet).ConfigureAwait(false);
await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -124,28 +126,28 @@ namespace MQTTnet.Core.Server
}
}

private async Task ProcessReceivedPacketAsync(MqttBasePacket packet)
private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet)
{
if (packet is MqttSubscribePacket subscribePacket)
{
await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
EnqueueRetainedMessages(subscribePacket);
}
else if (packet is MqttUnsubscribePacket unsubscribePacket)
{
await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
}
else if (packet is MqttPublishPacket publishPacket)
{
await HandleIncomingPublishPacketAsync(publishPacket);
await HandleIncomingPublishPacketAsync(adapter, publishPacket);
}
else if (packet is MqttPubRelPacket pubRelPacket)
{
await HandleIncomingPubRelPacketAsync(pubRelPacket);
await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket);
}
else if (packet is MqttPubRecPacket pubRecPacket)
{
await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse<MqttPubRelPacket>());
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse<MqttPubRelPacket>());
}
else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
{
@@ -153,7 +155,7 @@ namespace MQTTnet.Core.Server
}
else if (packet is MqttPingReqPacket)
{
await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
}
else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
@@ -175,7 +177,7 @@ namespace MQTTnet.Core.Server
}
}

private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
{
if (publishPacket.Retain)
{
@@ -191,7 +193,7 @@ namespace MQTTnet.Core.Server
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
return;
}

@@ -205,21 +207,21 @@ namespace MQTTnet.Core.Server

_mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);

await Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
return;
}

throw new MqttCommunicationException("Received a not supported QoS level.");
}

private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket)
{
lock (_unacknowledgedPublishPackets)
{
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}

return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
}
}
}

+ 2
- 3
MQTTnet.Core/Server/MqttClientSessionsManager.cs Visa fil

@@ -9,6 +9,7 @@ using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Server
{
@@ -24,9 +25,7 @@ namespace MQTTnet.Core.Server
}

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;

public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }
@@ -111,7 +110,7 @@ namespace MQTTnet.Core.Server
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.Adapter.PacketSerializer.ProtocolVersion
ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311
}).ToList();
}
}


+ 1
- 1
MQTTnet.sln Visa fil

@@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.16
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NetFramework", "Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}"
EndProject


+ 1
- 1
README.md Visa fil

@@ -18,7 +18,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien
* Performance optimized (processing ~27.000 messages / second)*
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (57+ tests)
* Unit tested (58+ tests)

\* Tested on local machine with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetFramework_.



+ 20
- 0
Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs Visa fil

@@ -0,0 +1,20 @@
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;

namespace MQTTnet.Core.Tests
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
private readonly IMqttCommunicationAdapter _adapter;

public MqttCommunicationAdapterFactory(IMqttCommunicationAdapter adapter)
{
_adapter = adapter;
}

public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
{
return _adapter;
}
}
}

+ 1
- 2
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs Visa fil

@@ -6,7 +6,6 @@ using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Serializer;
@@ -411,7 +410,7 @@ namespace MQTTnet.Core.Tests
_stream.Position = 0;
}

public Task ConnectAsync(MqttClientOptions options)
public Task ConnectAsync()
{
return Task.FromResult(0);
}


+ 0
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs Visa fil

@@ -6,7 +6,6 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;

namespace MQTTnet.Core.Tests
{


+ 1
- 2
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs Visa fil

@@ -4,7 +4,6 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Serializer;

@@ -18,7 +17,7 @@ namespace MQTTnet.Core.Tests

public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();

public Task ConnectAsync(TimeSpan timeout, MqttClientOptions options)
public Task ConnectAsync(TimeSpan timeout)
{
return Task.FromResult(0);
}


+ 7
- 7
Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs Visa fil

@@ -17,10 +17,11 @@ namespace MQTTnet.Core.Tests
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;

var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA);
var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA));
var connected = WaitForClientToConnect(server, clientId);

FireClientAcceptedEvent(adapterB);
await client.ConnectAsync(willMessage);
await client.ConnectAsync(new MqttClientTcpOptions { ClientId = clientId, WillMessage = willMessage });
await connected;

return client;
@@ -30,17 +31,16 @@ namespace MQTTnet.Core.Tests
{
var tcs = new TaskCompletionSource<object>();

EventHandler<MqttClientConnectedEventArgs> handler = null;
handler = (sender, args) =>
void Handler(object sender, MqttClientConnectedEventArgs args)
{
if (args.Client.ClientId == clientId)
{
s.ClientConnected -= handler;
s.ClientConnected -= Handler;
tcs.SetResult(null);
}
};
}

s.ClientConnected += handler;
s.ClientConnected += Handler;

return tcs.Task;
}


+ 5
- 6
Tests/MQTTnet.TestApp.NetCore/Program.cs Visa fil

@@ -46,12 +46,11 @@ namespace MQTTnet.TestApp.NetCore

try
{
var options = new MqttClientOptions
var options = new MqttClientWebSocketOptions
{
Server = "localhost",
Uri = "localhost",
ClientId = "XYZ",
CleanSession = true,
ConnectionType = MqttConnectionType.Ws
CleanSession = true
};

var client = new MqttClientFactory().CreateMqttClient(options);
@@ -84,7 +83,7 @@ namespace MQTTnet.TestApp.NetCore

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch
{
@@ -94,7 +93,7 @@ namespace MQTTnet.TestApp.NetCore

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch (Exception exception)
{


+ 3
- 3
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs Visa fil

@@ -32,7 +32,7 @@ namespace MQTTnet.TestApp.NetFramework
{
try
{
var options = new MqttClientOptions
var options = new MqttClientTcpOptions
{
Server = "localhost",
ClientId = "XYZ",
@@ -64,7 +64,7 @@ namespace MQTTnet.TestApp.NetFramework

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch
{
@@ -74,7 +74,7 @@ namespace MQTTnet.TestApp.NetFramework

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch (Exception exception)
{


+ 11
- 6
Tests/MQTTnet.TestApp.NetFramework/Program.cs Visa fil

@@ -52,13 +52,18 @@ namespace MQTTnet.TestApp.NetFramework

try
{
var options = new MqttClientOptions
var options = new MqttClientWebSocketOptions
{
Server = "localhost",
ClientId = "XYZ",
CleanSession = true
Uri = "broker.hivemq.com:8000/mqtt"
};

////var options = new MqttClientOptions
////{
//// Server = "localhost",
//// ClientId = "XYZ",
//// CleanSession = true
////};

var client = new MqttClientFactory().CreateMqttClient(options);
client.ApplicationMessageReceived += (s, e) =>
{
@@ -87,7 +92,7 @@ namespace MQTTnet.TestApp.NetFramework

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch
{
@@ -97,7 +102,7 @@ namespace MQTTnet.TestApp.NetFramework

try
{
await client.ConnectAsync();
await client.ConnectAsync(options);
}
catch (Exception exception)
{


+ 27
- 10
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs Visa fil

@@ -37,15 +37,32 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Connect(object sender, RoutedEventArgs e)
{
var options = new MqttClientOptions
{
Server = Server.Text,
UserName = User.Text,
Password = Password.Text,
ClientId = ClientId.Text,
TlsOptions = { UseTls = UseTls.IsChecked == true },
ConnectionType = UseTcp.IsChecked == true ? MqttConnectionType.Tcp : MqttConnectionType.Ws
};
MqttClientOptions options = null;
if (UseTcp.IsChecked == true)
{
options = new MqttClientTcpOptions
{
Server = Server.Text
};
}

if (UseWs.IsChecked == true)
{
options = new MqttClientWebSocketOptions
{
Uri = Server.Text
};
}

if (options == null)
{
throw new InvalidOperationException();
}

options.UserName = User.Text;
options.Password = Password.Text;
options.ClientId = ClientId.Text;
options.TlsOptions.UseTls = UseTls.IsChecked == true;
try
{
@@ -56,7 +73,7 @@ namespace MQTTnet.TestApp.UniversalWindows

var factory = new MqttClientFactory();
_mqttClient = factory.CreateMqttClient(options);
await _mqttClient.ConnectAsync();
await _mqttClient.ConnectAsync(options);
}
catch (Exception exception)
{


Laddar…
Avbryt
Spara