Browse Source

Fix several deadlocks

release/3.x.x
Christian 6 years ago
parent
commit
9e90111b37
20 changed files with 428 additions and 288 deletions
  1. +3
    -0
      Build/MQTTnet.nuspec
  2. +3
    -2
      Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs
  3. +5
    -3
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  4. +44
    -39
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  5. +6
    -0
      Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs
  6. +12
    -31
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs
  7. +16
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  8. +26
    -14
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
  9. +5
    -1
      Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs
  10. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  11. +57
    -51
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  12. +89
    -101
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  13. +6
    -7
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
  14. +10
    -15
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  15. +1
    -1
      README.md
  16. +1
    -13
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  17. +7
    -5
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
  18. +3
    -2
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  19. +5
    -0
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  20. +128
    -0
      Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs

+ 3
- 0
Build/MQTTnet.nuspec View File

@@ -12,6 +12,9 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> * [Client] Added new overloads for the message builder.
* [Core] Performance optimizations (thanks to @ israellot)
* [Core] Fixed a memory leak which was caused by not properly stopped async tasks.
* [Client] Fixed a deadlock when connecting to a not reachable server.
* [Server] Fixed a deadlock when reusing the same _ClientId_ while a will message is used (thanks to @william-wps).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 3
- 2
Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs View File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Packets;
@@ -12,9 +13,9 @@ namespace MQTTnet.Adapter

Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken);

Task DisconnectAsync(TimeSpan timeout);
Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken);

Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets);
Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken);

Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken);
}


+ 5
- 3
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Runtime.InteropServices;
@@ -40,16 +41,16 @@ namespace MQTTnet.Adapter
Internal.TaskExtensions.TimeoutAfter(ct => _channel.ConnectAsync(ct), timeout, cancellationToken));
}

public Task DisconnectAsync(TimeSpan timeout)
public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
_logger.Verbose<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);

return ExecuteAndWrapExceptionAsync(() =>
Internal.TaskExtensions.TimeoutAfter(ct => _channel.DisconnectAsync(), timeout, CancellationToken.None));
Internal.TaskExtensions.TimeoutAfter(ct => _channel.DisconnectAsync(), timeout, cancellationToken));
}

public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets)
public async Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
{
ThrowIfDisposed();

@@ -215,6 +216,7 @@ namespace MQTTnet.Adapter
public void Dispose()
{
_isDisposed = true;

_channel?.Dispose();
}



+ 44
- 39
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs View File

@@ -18,12 +18,12 @@ namespace MQTTnet.Client
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();

private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttNetLogger _logger;

private IMqttClientOptions _options;
private bool _isReceivingPackets;
private CancellationTokenSource _cancellationTokenSource;
private Task _packetReceiverTask;
private Task _keepAliveMessageSenderTask;
@@ -33,8 +33,6 @@ namespace MQTTnet.Client
{
_adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_packetDispatcher = new MqttPacketDispatcher(logger);
}

public event EventHandler<MqttClientConnectedEventArgs> Connected;
@@ -63,7 +61,7 @@ namespace MQTTnet.Client
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose<MqttClient>("Connection with server established.");

await StartReceivingPacketsAsync().ConfigureAwait(false);
StartReceivingPackets(_cancellationTokenSource.Token);

var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Verbose<MqttClient>("MQTT connection with server established.");
@@ -72,7 +70,7 @@ namespace MQTTnet.Client

if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages();
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
}

IsConnected = true;
@@ -271,9 +269,9 @@ namespace MQTTnet.Client

if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
_logger.Verbose<MqttClient>("Disconnected from adapter.");
}
catch (Exception adapterException)
@@ -383,30 +381,45 @@ namespace MQTTnet.Client
private Task SendAsync(params MqttBasePacket[] packets)
{
_sendTracker.Restart();
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packets);
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, _cancellationTokenSource.Token);
}

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
ushort? identifier = null;
if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier)
_sendTracker.Restart();

ushort identifier = 0;
if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
{
identifier = requestPacketWithIdentifier.PacketIdentifier;
identifier = packetWithIdentifier.PacketIdentifier.Value;
}

var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout);
await SendAsync(requestPacket).ConfigureAwait(false);
var packetAwaiter = _packetDispatcher.AddPacketAwaiter<TResponsePacket>(identifier);
try
{
await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, _cancellationTokenSource.Token).ConfigureAwait(false);
var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);

return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
return (TResponsePacket)respone;
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning<MqttPacketDispatcher>($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'.");
throw;
}
finally
{
_packetDispatcher.RemovePacketAwaiter<TResponsePacket>(identifier);
}
}

private async Task SendKeepAliveMessagesAsync()
private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
_logger.Verbose<MqttClient>("Start sending keep alive packets.");

try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
if (_options.KeepAliveSendInterval.HasValue)
@@ -419,7 +432,7 @@ namespace MQTTnet.Client
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
}

await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false);
await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -435,7 +448,7 @@ namespace MQTTnet.Client
{
_logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
}
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
finally
@@ -444,24 +457,22 @@ namespace MQTTnet.Client
}
}

private async Task ReceivePacketsAsync()
private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
_logger.Verbose<MqttClient>("Start receiving packets.");

try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
_isReceivingPackets = true;
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);

var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false);

if (_cancellationTokenSource.Token.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
return;
}

StartProcessReceivedPacket(packet);
StartProcessReceivedPacket(packet, cancellationToken);
}
}
catch (Exception exception)
@@ -479,6 +490,7 @@ namespace MQTTnet.Client
}

await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
_packetDispatcher.Dispatch(exception);
}
finally
{
@@ -486,26 +498,19 @@ namespace MQTTnet.Client
}
}

private void StartProcessReceivedPacket(MqttBasePacket packet)
private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
{
Task.Run(() => ProcessReceivedPacketAsync(packet), _cancellationTokenSource.Token);
Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken);
}

private async Task StartReceivingPacketsAsync()
private void StartReceivingPackets(CancellationToken cancellationToken)
{
_isReceivingPackets = false;

_packetReceiverTask = Task.Run(ReceivePacketsAsync, _cancellationTokenSource.Token);

while (!_isReceivingPackets && !_cancellationTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationTokenSource.Token).ConfigureAwait(false);
}
_packetReceiverTask = Task.Run(() => ReceivePacketsAsync(cancellationToken), cancellationToken);
}

private void StartSendingKeepAliveMessages()
private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
{
_keepAliveMessageSenderTask = Task.Run(SendKeepAliveMessagesAsync, _cancellationTokenSource.Token);
_keepAliveMessageSenderTask = Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken);
}
}
}

+ 6
- 0
Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs View File

@@ -36,6 +36,12 @@ namespace MQTTnet.Client
return this;
}

public MqttClientOptionsBuilder WithKeepAliveSendInterval(TimeSpan value)
{
_options.KeepAliveSendInterval = value;
return this;
}

public MqttClientOptionsBuilder WithClientId(string value)
{
_options.ClientId = value;


+ 12
- 31
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs View File

@@ -1,9 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Packets;

namespace MQTTnet.Client
@@ -11,28 +8,12 @@ namespace MQTTnet.Client
public class MqttPacketDispatcher
{
private readonly ConcurrentDictionary<Tuple<ushort, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort, Type>, TaskCompletionSource<MqttBasePacket>>();
private readonly IMqttNetLogger _logger;

public MqttPacketDispatcher(IMqttNetLogger logger)
public void Dispatch(Exception exception)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task<MqttBasePacket> WaitForPacketAsync(Type responseType, ushort? identifier, TimeSpan timeout)
{
var packetAwaiter = AddPacketAwaiter(responseType, identifier);
try
{
return await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, timeout, CancellationToken.None).ConfigureAwait(false);
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning<MqttPacketDispatcher>("Timeout while waiting for packet of type '{0}'.", responseType.Name);
throw;
}
finally
foreach (var awaiter in _awaiters)
{
RemovePacketAwaiter(responseType, identifier);
awaiter.Value.SetException(exception);
}
}

@@ -63,7 +44,7 @@ namespace MQTTnet.Client
_awaiters.Clear();
}

private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(Type responseType, ushort? identifier)
public TaskCompletionSource<MqttBasePacket> AddPacketAwaiter<TResponsePacket>(ushort? identifier) where TResponsePacket : MqttBasePacket
{
var tcs = new TaskCompletionSource<MqttBasePacket>();

@@ -71,25 +52,25 @@ namespace MQTTnet.Client
{
identifier = 0;
}
var dictionaryKey = new Tuple<ushort, Type>(identifier ?? 0, responseType);
if (!_awaiters.TryAdd(dictionaryKey, tcs))
var key = new Tuple<ushort, Type>(identifier ?? 0, typeof(TResponsePacket));
if (!_awaiters.TryAdd(key, tcs))
{
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}.");
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{key.Item2.Name}' with identifier {key.Item1}.");
}

return tcs;
}

private void RemovePacketAwaiter(Type responseType, ushort? identifier)
public void RemovePacketAwaiter<TResponsePacket>(ushort? identifier) where TResponsePacket : MqttBasePacket
{
if (!identifier.HasValue)
{
identifier = 0;
}

var dictionaryKey = new Tuple<ushort, Type>(identifier ?? 0, responseType);
_awaiters.TryRemove(dictionaryKey, out var _);
var key = new Tuple<ushort, Type>(identifier ?? 0, typeof(TResponsePacket));
_awaiters.TryRemove(key, out var _);
}
}
}

+ 16
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -10,6 +10,7 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttChannel
{
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
private readonly MqttClientWebSocketOptions _options;

private WebSocket _webSocket;
@@ -96,13 +97,26 @@ namespace MQTTnet.Implementations
return response.Count;
}

public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken);
// This lock is required because the client will throw an exception if _SendAsync_ is
// called from multiple threads at the same time. But this issue only happens with several
// framework versions.
await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false);
}
finally
{
_sendLock.Release();
}
}

public void Dispose()
{
_sendLock?.Dispose();

try
{
_webSocket?.Dispose();


+ 26
- 14
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs View File

@@ -11,7 +11,7 @@ namespace MQTTnet.Serializer
public sealed class MqttPacketSerializer : IMqttPacketSerializer
{
private static byte[] ProtocolVersionV311Name { get; } = Encoding.UTF8.GetBytes("MQTT");
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIsdp");

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

@@ -203,22 +203,30 @@ namespace MQTTnet.Serializer

private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
{
reader.ReadBytes(2); // Skip 2 bytes
reader.ReadBytes(2); // Skip 2 bytes for header and remaining length.

MqttProtocolVersion protocolVersion;
var protocolName = reader.ReadBytes(4);
if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{
reader.ReadBytes(2);
protocolVersion = MqttProtocolVersion.V310;
}
else if (protocolName.SequenceEqual(ProtocolVersionV311Name))

if (protocolName.SequenceEqual(ProtocolVersionV311Name))
{
protocolVersion = MqttProtocolVersion.V311;
}
else
{
throw new MqttProtocolViolationException("Protocol name is not supported.");
var buffer = new byte[6];
Array.Copy(protocolName, buffer, 4);
protocolName = reader.ReadBytes(2);
Array.Copy(protocolName, 0, buffer, 4, 2);

if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{
protocolVersion = MqttProtocolVersion.V310;
}
else
{
throw new MqttProtocolViolationException("Protocol name is not supported.");
}
}

reader.ReadByte(); // Skip protocol level
@@ -323,16 +331,15 @@ namespace MQTTnet.Serializer
ValidateConnectPacket(packet);

// Write variable header
writer.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
if (ProtocolVersion == MqttProtocolVersion.V311)
{
writer.Write(ProtocolVersionV311Name);
writer.Write(0x04); // 3.1.2.2 Protocol Level (4)
writer.WriteWithLengthPrefix(ProtocolVersionV311Name);
writer.Write(0x04); // 3.1.2.2 Protocol Level 4
}
else
{
writer.Write(ProtocolVersionV310Name);
writer.Write(0x64, 0x70, 0x03); // Protocol Level (0x03)
writer.WriteWithLengthPrefix(ProtocolVersionV310Name);
writer.Write(0x03); // Protocol Level 3
}

var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
@@ -351,6 +358,11 @@ namespace MQTTnet.Serializer
connectFlags.Write(false);
}

if (packet.Password != null && packet.Username == null)
{
throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
}

connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null);



+ 5
- 1
Frameworks/MQTTnet.NetStandard/Server/ConnectedMqttClient.cs View File

@@ -3,11 +3,15 @@ using MQTTnet.Serializer;

namespace MQTTnet.Server
{
// TODO: Rename to "RegisteredClient"
// TODO: Add IsConnected
// TODO: Add interface

public class ConnectedMqttClient
{
public string ClientId { get; set; }

public MqttProtocolVersion ProtocolVersion { get; set; }
public MqttProtocolVersion? ProtocolVersion { get; set; }

public TimeSpan LastPacketReceived { get; set; }



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -93,7 +93,7 @@ namespace MQTTnet.Server
return;
}

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false);

_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
}


+ 57
- 51
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -16,11 +16,11 @@ namespace MQTTnet.Server
public sealed class MqttClientSession : IDisposable
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger;
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttNetLogger _logger;
private readonly IMqttServerOptions _options;
private readonly MqttClientSessionsManager _sessionsManager;

private IMqttChannelAdapter _adapter;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
private bool _wasCleanDisconnect;
@@ -28,22 +28,22 @@ namespace MQTTnet.Server
public MqttClientSession(
string clientId,
IMqttServerOptions options,
MqttClientSessionsManager sessionsManager,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttNetLogger logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_sessionsManager = sessionsManager;
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

ClientId = clientId;

KeepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, StopDueToKeepAliveTimeoutAsync, _logger);
SubscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId);
SubscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server);
PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
}

public Func<MqttClientSession, MqttApplicationMessage, Task> ApplicationMessageReceivedCallback { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }

public MqttClientPendingMessagesQueue PendingMessagesQueue { get; }
@@ -52,9 +52,9 @@ namespace MQTTnet.Server

public string ClientId { get; }

public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
public MqttProtocolVersion? ProtocolVersion { get; private set; }

public bool IsConnected => _adapter != null;
public bool IsConnected { get; private set; }

public async Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{
@@ -63,66 +63,73 @@ namespace MQTTnet.Server

try
{
var cancellationTokenSource = new CancellationTokenSource();
_cancellationTokenSource = new CancellationTokenSource();

_wasCleanDisconnect = false;
_willMessage = connectPacket.WillMessage;
_adapter = adapter;
_cancellationTokenSource = cancellationTokenSource;

PendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);
KeepAliveMonitor.Start(connectPacket.KeepAlivePeriod, cancellationTokenSource.Token);
IsConnected = true;
ProtocolVersion = adapter.PacketSerializer.ProtocolVersion;

PendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
KeepAliveMonitor.Start(connectPacket.KeepAlivePeriod, _cancellationTokenSource.Token);

await ReceivePacketsAsync(adapter, cancellationTokenSource.Token).ConfigureAwait(false);
await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
_logger.Warning<MqttClientSession>(exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
_logger.Warning<MqttClientSession>(exception,
"Client '{0}': Communication exception while processing client packets.", ClientId);
}
catch (Exception exception)
{
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
_logger.Error<MqttClientSession>(exception,
"Client '{0}': Unhandled exception while processing client packets.", ClientId);
}
finally
{
ProtocolVersion = null;
IsConnected = false;

_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}

return _wasCleanDisconnect;
}

public async Task StopAsync(bool wasCleanDisconnect = false)
public Task StopAsync(bool wasCleanDisconnect = false)
{
try
{
if (_cancellationTokenSource == null)
{
return;
return Task.FromResult(0);
}

_wasCleanDisconnect = wasCleanDisconnect;

_cancellationTokenSource?.Cancel(false);

PendingMessagesQueue.WaitForCompletion();
KeepAliveMonitor.WaitForCompletion();

_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_adapter = null;

_logger.Info<MqttClientSession>("Client '{0}': Session stopped.", ClientId);
}
finally
{
var willMessage = _willMessage;
_willMessage = null; // clear willmessage so it is send just once

if (willMessage != null && !wasCleanDisconnect)
if (_willMessage != null && !wasCleanDisconnect)
{
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false);
_sessionsManager.StartDispatchApplicationMessage(this, willMessage);
}
}
finally
{
_logger.Info<MqttClientSession>("Client '{0}': Session stopped.", ClientId);
}

return Task.FromResult(0);
}

public async Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
@@ -170,8 +177,6 @@ namespace MQTTnet.Server

public void Dispose()
{
ApplicationMessageReceivedCallback = null;

SubscriptionsManager?.Dispose();
PendingMessagesQueue?.Dispose();

@@ -219,7 +224,7 @@ namespace MQTTnet.Server

if (packet is MqttPingReqPacket)
{
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttPingRespPacket() });
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { new MqttPingRespPacket() }, cancellationToken);
}

if (packet is MqttPubRelPacket pubRelPacket)
@@ -234,7 +239,7 @@ namespace MQTTnet.Server
PacketIdentifier = pubRecPacket.PacketIdentifier
};

return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { responsePacket });
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { responsePacket }, cancellationToken);
}

if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
@@ -267,10 +272,19 @@ namespace MQTTnet.Server
return StopAsync();
}

private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
{
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters);
foreach (var applicationMessage in retainedMessages)
{
await EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
}
}

private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{
var subscribeResult = await SubscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { subscribeResult.ResponsePacket }).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { subscribeResult.ResponsePacket }, cancellationToken).ConfigureAwait(false);

if (subscribeResult.CloseConnection)
{
@@ -283,16 +297,7 @@ namespace MQTTnet.Server
private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
{
var unsubscribeResult = await SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { unsubscribeResult });
}

private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
{
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters);
foreach (var applicationMessage in retainedMessages)
{
await EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
}
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { unsubscribeResult }, cancellationToken);
}

private Task HandleIncomingPublishPacketAsync(IMqttChannelAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
@@ -303,7 +308,8 @@ namespace MQTTnet.Server
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
return ApplicationMessageReceivedCallback?.Invoke(this, applicationMessage);
_sessionsManager.StartDispatchApplicationMessage(this, applicationMessage);
return Task.FromResult(0);
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
@@ -322,25 +328,25 @@ namespace MQTTnet.Server

private async Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);
_sessionsManager.StartDispatchApplicationMessage(this, applicationMessage);

var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response }).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { response }, cancellationToken).ConfigureAwait(false);
}

private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);
_sessionsManager.StartDispatchApplicationMessage(this, applicationMessage);

var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response }).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { response }, cancellationToken).ConfigureAwait(false);
}

private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
{
var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier };
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response });
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { response }, cancellationToken);
}
}
}

+ 89
- 101
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -8,38 +8,34 @@ using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Serializer;

namespace MQTTnet.Server
{
public sealed class MqttClientSessionsManager : IDisposable
{
private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _sessionsLock = new SemaphoreSlim(1, 1);

private readonly IMqttServerOptions _options;
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger;

public MqttClientSessionsManager(IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options));
Server = server ?? throw new ArgumentNullException(nameof(server));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
}

public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; }
public Action<ConnectedMqttClient, bool> ClientDisconnectedCallback { get; set; }
public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; }
public Action<string, string> ClientUnsubscribedTopicCallback { get; set; }
public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; }
public MqttServer Server { get; }

public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
var wasCleanDisconnect = false;
MqttClientSession clientSession = null;
try
{
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
@@ -57,30 +53,30 @@ namespace MQTTnet.Server
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[]
{
new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}
}).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);

return;
}

var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
var result = await PrepareClientSessionAsync(connectPacket).ConfigureAwait(false);
clientSession = result.Session;

await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[]
{
new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = result.IsExistingSession
}
}).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);

ClientConnectedCallback?.Invoke(new ConnectedMqttClient
Server.OnClientConnected(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
@@ -99,15 +95,15 @@ namespace MQTTnet.Server
{
try
{
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
clientAdapter.Dispose();
}
catch (Exception exception)
{
_logger.Error<MqttClientSessionsManager>(exception, exception.Message);
}
ClientDisconnectedCallback?.Invoke(new ConnectedMqttClient
Server.OnClientDisconnected(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion,
@@ -119,7 +115,7 @@ namespace MQTTnet.Server

public async Task StopAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
foreach (var session in _sessions)
@@ -131,19 +127,19 @@ namespace MQTTnet.Server
}
finally
{
_semaphore.Release();
_sessionsLock.Release();
}
}

public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311,
ProtocolVersion = s.Value.ProtocolVersion,
LastPacketReceived = s.Value.KeepAliveMonitor.LastPacketReceived,
LastNonKeepAlivePacketReceived = s.Value.KeepAliveMonitor.LastNonKeepAlivePacketReceived,
PendingApplicationMessages = s.Value.PendingMessagesQueue.Count
@@ -151,49 +147,13 @@ namespace MQTTnet.Server
}
finally
{
_semaphore.Release();
_sessionsLock.Release();
}
}

public async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
public void StartDispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
try
{
var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
if (interceptorContext.CloseConnection)
{
await senderClientSession.StopAsync().ConfigureAwait(false);
}

if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{
return;
}

if (applicationMessage.Retain)
{
await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
}

ApplicationMessageReceivedCallback?.Invoke(senderClientSession?.ClientId, applicationMessage);
}
catch (Exception exception)
{
_logger.Error<MqttClientSessionsManager>(exception, "Error while processing application message");
}

await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
foreach (var clientSession in _sessions.Values)
{
await clientSession.EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
}
}
finally
{
_semaphore.Release();
}
Task.Run(() => DispatchApplicationMessageAsync(senderClientSession, applicationMessage));
}

public async Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters)
@@ -201,19 +161,19 @@ namespace MQTTnet.Server
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

await _semaphore.WaitAsync().ConfigureAwait(false);
await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
if (!_sessions.TryGetValue(clientId, out var session))
{
throw new InvalidOperationException($"Client session {clientId} is unknown.");
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}

await session.SubscribeAsync(topicFilters);
await session.SubscribeAsync(topicFilters).ConfigureAwait(false);
}
finally
{
_semaphore.Release();
_sessionsLock.Release();
}
}

@@ -222,36 +182,25 @@ namespace MQTTnet.Server
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

await _semaphore.WaitAsync().ConfigureAwait(false);
await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
if (!_sessions.TryGetValue(clientId, out var session))
{
throw new InvalidOperationException($"Client session {clientId} is unknown.");
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}

await session.UnsubscribeAsync(topicFilters);
await session.UnsubscribeAsync(topicFilters).ConfigureAwait(false);
}
finally
{
_semaphore.Release();
_sessionsLock.Release();
}
}

private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
public void Dispose()
{
var interceptorContext = new MqttApplicationMessageInterceptorContext(
senderClientSession?.ClientId,
applicationMessage);

var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
{
return interceptorContext;
}
interceptor(interceptorContext);
return interceptorContext;
_sessionsLock?.Dispose();
}

private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
@@ -271,9 +220,9 @@ namespace MQTTnet.Server
return context.ReturnCode;
}

private async Task<GetOrCreateClientSessionResult> GetOrCreateClientSessionAsync(MqttConnectPacket connectPacket)
private async Task<GetOrCreateClientSessionResult> PrepareClientSessionAsync(MqttConnectPacket connectPacket)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
@@ -300,14 +249,7 @@ namespace MQTTnet.Server
{
isExistingSession = false;

clientSession = new MqttClientSession(connectPacket.ClientId, _options, _retainedMessagesManager, _logger)
{
ApplicationMessageReceivedCallback = DispatchApplicationMessageAsync
};

clientSession.SubscriptionsManager.TopicSubscribedCallback = ClientSubscribedTopicCallback;
clientSession.SubscriptionsManager.TopicUnsubscribedCallback = ClientUnsubscribedTopicCallback;

clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger);
_sessions[connectPacket.ClientId] = clientSession;

_logger.Verbose<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId);
@@ -317,19 +259,65 @@ namespace MQTTnet.Server
}
finally
{
_semaphore.Release();
_sessionsLock.Release();
}
}

public void Dispose()
private async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
try
{
var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage);
if (interceptorContext.CloseConnection)
{
await senderClientSession.StopAsync().ConfigureAwait(false);
}

if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish)
{
return;
}

if (applicationMessage.Retain)
{
await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
}

Server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage);
}
catch (Exception exception)
{
_logger.Error<MqttClientSessionsManager>(exception, "Error while processing application message");
}

await _sessionsLock.WaitAsync().ConfigureAwait(false);
try
{
foreach (var clientSession in _sessions.Values)
{
await clientSession.EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
}
}
finally
{
_sessionsLock.Release();
}
}

private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
ClientConnectedCallback = null;
ClientDisconnectedCallback = null;
ClientSubscribedTopicCallback = null;
ClientUnsubscribedTopicCallback = null;
ApplicationMessageReceivedCallback = null;
var interceptorContext = new MqttApplicationMessageInterceptorContext(
senderClientSession?.ClientId,
applicationMessage);

_semaphore?.Dispose();
var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
{
return interceptorContext;
}

interceptor(interceptorContext);
return interceptorContext;
}
}
}

+ 6
- 7
Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs View File

@@ -13,17 +13,16 @@ namespace MQTTnet.Server
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IMqttServerOptions _options;
private readonly MqttServer _server;
private readonly string _clientId;

public MqttClientSubscriptionsManager(IMqttServerOptions options, string clientId)
public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServer server)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_options = options ?? throw new ArgumentNullException(nameof(options));
_server = server;
}

public Action<string, TopicFilter> TopicSubscribedCallback { get; set; }
public Action<string, string> TopicUnsubscribedCallback { get; set; }

public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttSubscribePacket subscribePacket)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
@@ -61,7 +60,7 @@ namespace MQTTnet.Server
if (interceptorContext.AcceptSubscription)
{
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
TopicSubscribedCallback?.Invoke(_clientId, topicFilter);
_server.OnClientSubscribedTopic(_clientId, topicFilter);
}
}
}
@@ -83,7 +82,7 @@ namespace MQTTnet.Server
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
_subscriptions.Remove(topicFilter);
TopicUnsubscribedCallback?.Invoke(_clientId, topicFilter);
_server.OnClientUnsubscribedTopic(_clientId, topicFilter);
}
}
finally


+ 10
- 15
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -57,7 +57,7 @@ namespace MQTTnet.Server
return _clientSessionsManager.UnsubscribeAsync(clientId, topicFilters);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

@@ -65,8 +65,10 @@ namespace MQTTnet.Server

foreach (var applicationMessage in applicationMessages)
{
await _clientSessionsManager.DispatchApplicationMessageAsync(null, applicationMessage);
_clientSessionsManager.StartDispatchApplicationMessage(null, applicationMessage);
}

return Task.FromResult(0);
}

public async Task StartAsync(IMqttServerOptions options)
@@ -80,14 +82,7 @@ namespace MQTTnet.Server
_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync();

_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _logger)
{
ClientConnectedCallback = OnClientConnected,
ClientDisconnectedCallback = OnClientDisconnected,
ClientSubscribedTopicCallback = OnClientSubscribedTopic,
ClientUnsubscribedTopicCallback = OnClientUnsubscribedTopic,
ApplicationMessageReceivedCallback = OnApplicationMessageReceived
};
_clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _logger);

foreach (var adapter in _adapters)
{
@@ -132,29 +127,29 @@ namespace MQTTnet.Server
}
}

private void OnClientConnected(ConnectedMqttClient client)
internal void OnClientConnected(ConnectedMqttClient client)
{
_logger.Info<MqttServer>("Client '{0}': Connected.", client.ClientId);
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client));
}

private void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect)
internal void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect)
{
_logger.Info<MqttServer>("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect));
}

private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
internal void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
{
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
}

private void OnClientUnsubscribedTopic(string clientId, string topicFilter)
internal void OnClientUnsubscribedTopic(string clientId, string topicFilter)
{
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
}

private void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));
}


+ 1
- 1
README.md View File

@@ -18,7 +18,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* TLS 1.2 support for client and server (but not UWP servers)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS)
* Lightweight (only the low level implementation of MQTT, no overhead)
* Performance optimized (processing ~50.000 messages / second)*
* Performance optimized (processing ~60.000 messages / second)*
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (~80 tests)


+ 1
- 13
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
@@ -25,7 +24,7 @@ namespace MQTTnet.Core.Tests
CleanSession = true
};

SerializeAndCompare(p, "EB0ABE1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310);
SerializeAndCompare(p, "EB0ABk1RSXNkcAPCAHsAA1hZWgAEVVNFUgAEUEFTUw==", MqttProtocolVersion.V310);
}

[TestMethod]
@@ -428,17 +427,6 @@ namespace MQTTnet.Core.Tests
}
}

private static byte[] Join(IEnumerable<ArraySegment<byte>> chunks)
{
var buffer = new MemoryStream();
foreach (var chunk in chunks)
{
buffer.Write(chunk.Array, chunk.Offset, chunk.Count);
}

return buffer.ToArray();
}

private static byte[] Join(params ArraySegment<byte>[] chunks)
{
var buffer = new MemoryStream();


+ 7
- 5
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs View File

@@ -1,4 +1,6 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
@@ -11,7 +13,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions(), "");
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -32,7 +34,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
{
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions(), "");
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
@@ -53,7 +55,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
{
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions(), "");
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
@@ -75,7 +77,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions(), "");
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -94,7 +96,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{
var sm = new MqttClientSubscriptionsManager(new MqttServerOptions(), "");
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());


+ 3
- 2
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
@@ -25,12 +26,12 @@ namespace MQTTnet.Core.Tests
return Task.FromResult(0);
}

public Task DisconnectAsync(TimeSpan timeout)
public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
return Task.FromResult(0);
}

public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets)
public Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
{
ThrowIfPartnerIsNull();



+ 5
- 0
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -19,6 +19,7 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine("2 = Start server");
Console.WriteLine("3 = Start performance test");
Console.WriteLine("4 = Start managed client");
Console.WriteLine("5 = Start public broker test");

var pressedKey = Console.ReadKey(true);
if (pressedKey.KeyChar == '1')
@@ -37,6 +38,10 @@ namespace MQTTnet.TestApp.NetCore
{
Task.Run(ManagedClientTest.RunAsync);
}
else if (pressedKey.KeyChar == '5')
{
Task.Run(PublicBrokerTest.RunAsync);
}

Thread.Sleep(Timeout.Infinite);
}


+ 128
- 0
Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs View File

@@ -0,0 +1,128 @@
using MQTTnet.Client;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Protocol;
using Newtonsoft.Json;

namespace MQTTnet.TestApp.NetCore
{
public static class PublicBrokerTest
{
public static async Task RunAsync()
{
//MqttNetGlobalLogger.LogMessagePublished += (s, e) => Console.WriteLine(e.TraceMessage);

// iot.eclipse.org
await ExecuteTestAsync("iot.eclipse.org TCP",
new MqttClientOptionsBuilder().WithTcpServer("iot.eclipse.org", 1883).Build());

await ExecuteTestAsync("iot.eclipse.org WS",
new MqttClientOptionsBuilder().WithWebSocketServer("iot.eclipse.org:80/mqtt").Build());

await ExecuteTestAsync("iot.eclipse.org WS TLS",
new MqttClientOptionsBuilder().WithWebSocketServer("iot.eclipse.org:443/mqtt").WithTls().Build());

// test.mosquitto.org
await ExecuteTestAsync("test.mosquitto.org TCP",
new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 1883).Build());

await ExecuteTestAsync("test.mosquitto.org TCP TLS",
new MqttClientOptionsBuilder().WithTcpServer("test.mosquitto.org", 8883).WithTls().Build());

await ExecuteTestAsync("test.mosquitto.org WS",
new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8080/mqtt").Build());

await ExecuteTestAsync("test.mosquitto.org WS TLS",
new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").Build());

// broker.hivemq.com
await ExecuteTestAsync("broker.hivemq.com TCP",
new MqttClientOptionsBuilder().WithTcpServer("broker.hivemq.com", 1883).Build());

await ExecuteTestAsync("broker.hivemq.com WS",
new MqttClientOptionsBuilder().WithWebSocketServer("broker.hivemq.com:8000/mqtt").Build());

// mqtt.swifitch.cz
await ExecuteTestAsync("mqtt.swifitch.cz",
new MqttClientOptionsBuilder().WithTcpServer("mqtt.swifitch.cz", 1883).Build());

// CloudMQTT
var configFile = Path.Combine("E:\\CloudMqttTestConfig.json");
if (File.Exists(configFile))
{
var config = JsonConvert.DeserializeObject<MqttConfig>(File.ReadAllText(configFile));

await ExecuteTestAsync("CloudMQTT TCP",
new MqttClientOptionsBuilder().WithTcpServer(config.Server, config.Port).WithCredentials(config.Username, config.Password).Build());

await ExecuteTestAsync("CloudMQTT TCP TLS",
new MqttClientOptionsBuilder().WithTcpServer(config.Server, config.SslPort).WithCredentials(config.Username, config.Password).WithTls().Build());

await ExecuteTestAsync("CloudMQTT WS TLS",
new MqttClientOptionsBuilder().WithWebSocketServer(config.Server + ":" + config.SslWebSocketPort + "/mqtt").WithCredentials(config.Username, config.Password).WithTls().Build());
}

Write("Finished.", ConsoleColor.White);
Console.ReadLine();
}

private static async Task ExecuteTestAsync(string name, IMqttClientOptions options)
{
try
{
Write("Testing '" + name + "'... ", ConsoleColor.Gray);
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var topic = Guid.NewGuid().ToString();

MqttApplicationMessage receivedMessage = null;
client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage;

await client.ConnectAsync(options);
await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);
await client.PublishAsync(topic, "Hello_World", MqttQualityOfServiceLevel.AtLeastOnce);

SpinWait.SpinUntil(() => receivedMessage != null, 5000);

if (receivedMessage?.Topic != topic || receivedMessage?.ConvertPayloadToString() != "Hello_World")
{
throw new Exception("Message invalid.");
}

await client.UnsubscribeAsync("test");
await client.DisconnectAsync();

Write("[OK]\n", ConsoleColor.Green);
}
catch (Exception e)
{
Write("[FAILED] " + e.Message + "\n", ConsoleColor.Red);
}
}

private static void Write(string message, ConsoleColor color)
{
Console.ForegroundColor = color;
Console.Write(message);
}

public class MqttConfig
{
public string Server { get; set; }

public string Username { get; set; }

public string Password { get; set; }

public int Port { get; set; }

public int SslPort { get; set; }

public int WebSocketPort { get; set; }

public int SslWebSocketPort { get; set; }
}
}
}

Loading…
Cancel
Save