瀏覽代碼

Fixed a race condition in client and server.

release/3.x.x
Christian 6 年之前
父節點
當前提交
8f96acaec4
共有 14 個檔案被更改,包括 215 行新增118 行删除
  1. +2
    -0
      Build/MQTTnet.nuspec
  2. +1
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs
  3. +38
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  4. +3
    -2
      Frameworks/MQTTnet.NetStandard/Channel/IMqttChannel.cs
  5. +90
    -80
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  6. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs
  7. +5
    -8
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  8. +5
    -0
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
  9. +11
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
  10. +23
    -4
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  11. +5
    -5
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  12. +26
    -14
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  13. +4
    -0
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  14. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs

+ 2
- 0
Build/MQTTnet.nuspec 查看文件

@@ -15,6 +15,8 @@
* [Client] Added the "IsStarted" property for the managed client.
* [Client] Optimized stream buffer for UWP apps.
* [Client] Added the _BufferSize_ to the TCP options.
* [Client] Fixed a race condition which leads to exceptions when reconnecting rapidly.
* [Server] Fixed a race condition which leads to exceptions when clients are reconnecting rapidly.
* [Core] Fixed some issues in stream and socket handling.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs 查看文件

@@ -7,7 +7,7 @@ using MQTTnet.Serializer;

namespace MQTTnet.Adapter
{
public interface IMqttChannelAdapter
public interface IMqttChannelAdapter : IDisposable
{
IMqttPacketSerializer PacketSerializer { get; }



+ 38
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs 查看文件

@@ -14,11 +14,12 @@ using MQTTnet.Serializer;

namespace MQTTnet.Adapter
{
public class MqttChannelAdapter : IMqttChannelAdapter
public sealed class MqttChannelAdapter : IMqttChannelAdapter
{
private const uint ErrorOperationAborted = 0x800703E3;
private const int ReadBufferSize = 4096; // TODO: Move buffer size to config

private bool _isDisposed;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IMqttNetLogger _logger;
private readonly IMqttChannel _channel;
@@ -34,6 +35,7 @@ namespace MQTTnet.Adapter

public Task ConnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);

return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
@@ -41,6 +43,7 @@ namespace MQTTnet.Adapter

public Task DisconnectAsync(TimeSpan timeout)
{
ThrowIfDisposed();
_logger.Trace<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);

return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
@@ -48,6 +51,8 @@ namespace MQTTnet.Adapter

public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
{
ThrowIfDisposed();

return ExecuteAndWrapExceptionAsync(async () =>
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -55,6 +60,11 @@ namespace MQTTnet.Adapter
{
foreach (var packet in packets)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

if (packet == null)
{
continue;
@@ -65,10 +75,20 @@ namespace MQTTnet.Adapter
var chunks = PacketSerializer.Serialize(packet);
foreach (var chunk in chunks)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

await _channel.SendStream.WriteAsync(chunk.Array, chunk.Offset, chunk.Count, cancellationToken).ConfigureAwait(false);
}
}

if (cancellationToken.IsCancellationRequested)
{
return;
}

if (timeout > TimeSpan.Zero)
{
await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
@@ -87,6 +107,8 @@ namespace MQTTnet.Adapter

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();

MqttBasePacket packet = null;
await ExecuteAndWrapExceptionAsync(async () =>
{
@@ -214,5 +236,20 @@ namespace MQTTnet.Adapter
throw new MqttCommunicationException(exception);
}
}

public void Dispose()
{
_isDisposed = true;
_semaphore?.Dispose();
_channel?.Dispose();
}

private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(MqttChannelAdapter));
}
}
}
}

+ 3
- 2
Frameworks/MQTTnet.NetStandard/Channel/IMqttChannel.cs 查看文件

@@ -1,9 +1,10 @@
using System.IO;
using System;
using System.IO;
using System.Threading.Tasks;

namespace MQTTnet.Channel
{
public interface IMqttChannel
public interface IMqttChannel : IDisposable
{
Stream SendStream { get; }
Stream ReceiveStream { get; }


+ 90
- 80
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs 查看文件

@@ -17,6 +17,7 @@ namespace MQTTnet.Client
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttNetLogger _logger;
@@ -24,13 +25,15 @@ namespace MQTTnet.Client
private IMqttClientOptions _options;
private bool _isReceivingPackets;
private CancellationTokenSource _cancellationTokenSource;
private Task _packetReceiverTask;
private Task _keepAliveMessageSenderTask;
private IMqttChannelAdapter _adapter;

public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{
_adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_packetDispatcher = new MqttPacketDispatcher(logger);
}

@@ -55,12 +58,12 @@ namespace MQTTnet.Client
_packetDispatcher.Reset();

_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger);
_logger.Trace<MqttClient>("Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Trace<MqttClient>("Connection with server established.");

await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
await StartReceivingPacketsAsync().ConfigureAwait(false);

var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Trace<MqttClient>("MQTT connection with server established.");
@@ -69,7 +72,7 @@ namespace MQTTnet.Client

if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
StartSendingKeepAliveMessages();
}

IsConnected = true;
@@ -94,7 +97,10 @@ namespace MQTTnet.Client

try
{
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
if (!_cancellationTokenSource.IsCancellationRequested)
{
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
}
}
finally
{
@@ -144,7 +150,7 @@ namespace MQTTnet.Client
ThrowIfNotConnected();

var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);
var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);

foreach (var qosGroup in packetGroups)
{
@@ -153,7 +159,7 @@ namespace MQTTnet.Client
case MqttQualityOfServiceLevel.AtMostOnce:
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(qosGroup.ToArray()).ConfigureAwait(false);
await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false);
break;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
@@ -194,6 +200,9 @@ namespace MQTTnet.Client
public void Dispose()
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_adapter?.Dispose();
}

private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
@@ -229,23 +238,31 @@ namespace MQTTnet.Client

private async Task DisconnectInternalAsync(Exception exception)
{
await _disconnectLock.WaitAsync();
var clientWasConnected = IsConnected;
IsConnected = false;

var cts = _cancellationTokenSource;
if (cts == null || cts.IsCancellationRequested)
try
{
return;
}
IsConnected = false;

cts.Cancel(false);
cts.Dispose();
_cancellationTokenSource = null;
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
return;
}

_cancellationTokenSource.Cancel(false);

if (_packetReceiverTask != null)
{
Task.WaitAll(_packetReceiverTask);
}

if (_keepAliveMessageSenderTask != null)
{
Task.WaitAll(_keepAliveMessageSenderTask);
}

try
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false);
_logger.Info<MqttClient>("Disconnected from adapter.");
_logger.Trace<MqttClient>("Disconnected from adapter.");
}
catch (Exception adapterException)
{
@@ -253,6 +270,14 @@ namespace MQTTnet.Client
}
finally
{
_adapter?.Dispose();
_adapter = null;

_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_disconnectLock.Release();
_logger.Info<MqttClient>("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
@@ -263,7 +288,7 @@ namespace MQTTnet.Client
try
{
_logger.Info<MqttClient>("Received <<< {0}", packet);
if (packet is MqttPublishPacket publishPacket)
{
await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false);
@@ -311,6 +336,11 @@ namespace MQTTnet.Client

private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
return Task.FromResult(0);
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
FireApplicationMessageReceivedEvent(publishPacket);
@@ -363,54 +393,48 @@ namespace MQTTnet.Client
return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
}

private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
private async Task SendKeepAliveMessagesAsync()
{
_logger.Info<MqttClient>("Start sending keep alive packets.");

try
{
while (!cancellationToken.IsCancellationRequested)
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
TimeSpan keepAliveSendInterval;
var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
if (_options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
}
else
{
keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
}

if (_sendTracker.Elapsed > keepAliveSendInterval)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
}
await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
}
catch (Exception exception)
{
if (_cancellationTokenSource.Token.IsCancellationRequested)
{
return;
}

await DisconnectInternalAsync(null).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
if (cancellationToken.IsCancellationRequested)
if (exception is MqttCommunicationException)
{
return;
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
}
else
{
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");

}

_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
}
finally
@@ -419,48 +443,46 @@ namespace MQTTnet.Client
}
}

private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
private async Task ReceivePacketsAsync()
{
_logger.Info<MqttClient>("Start receiving packets.");

try
{
while (!cancellationToken.IsCancellationRequested)
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
_isReceivingPackets = true;

var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
if (_cancellationTokenSource.Token.IsCancellationRequested)
{
return;
}

StartProcessReceivedPacket(packet, cancellationToken);
StartProcessReceivedPacket(packet);
}
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
}
catch (Exception exception)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
return;
}

await DisconnectInternalAsync(null).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
if (cancellationToken.IsCancellationRequested)
if (exception is MqttCommunicationException)
{
return;
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
}
else
{
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");

}

_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync(exception).ConfigureAwait(false);
}
finally
@@ -469,38 +491,26 @@ namespace MQTTnet.Client
}
}

private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
private void StartProcessReceivedPacket(MqttBasePacket packet)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(
async () => await ProcessReceivedPacketAsync(packet).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => ProcessReceivedPacketAsync(packet), _cancellationTokenSource.Token);
}

private async Task StartReceivingPacketsAsync(CancellationToken cancellationToken)
private async Task StartReceivingPacketsAsync()
{
_isReceivingPackets = false;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(
async () => await ReceivePacketsAsync(cancellationToken).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_packetReceiverTask = Task.Run(ReceivePacketsAsync, _cancellationTokenSource.Token);

while (!_isReceivingPackets && !cancellationToken.IsCancellationRequested)
while (!_isReceivingPackets && !_cancellationTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationTokenSource.Token).ConfigureAwait(false);
}
}

private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
private void StartSendingKeepAliveMessages()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(
async () => await SendKeepAliveMessagesAsync(cancellationToken).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_keepAliveMessageSenderTask = Task.Run(SendKeepAliveMessagesAsync, _cancellationTokenSource.Token);
}
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs 查看文件

@@ -13,7 +13,7 @@ using MQTTnet.Client;

namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
public sealed class MqttTcpChannel : IMqttChannel
{
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global


+ 5
- 8
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs 查看文件

@@ -12,9 +12,8 @@ using MQTTnet.Client;

namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
public sealed class MqttTcpChannel : IMqttChannel
{
//todo: this can be used with min dependency NetStandard1.6
#if NET452 || NET461
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
@@ -63,7 +62,6 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

//todo: else brach can be used with min dependency NET46
#if NET452 || NET461
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false);
#else
@@ -73,7 +71,7 @@ namespace MQTTnet.Implementations
if (_options.TlsOptions.UseTls)
{
_sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback);
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
CreateStreams();
@@ -184,15 +182,15 @@ namespace MQTTnet.Implementations
return _options.TlsOptions.AllowUntrustedCertificates;
}

private static X509CertificateCollection LoadCertificates(MqttClientTcpOptions options)
private X509CertificateCollection LoadCertificates()
{
var certificates = new X509CertificateCollection();
if (options.TlsOptions.Certificates == null)
if (_options.TlsOptions.Certificates == null)
{
return certificates;
}

foreach (var certificate in options.TlsOptions.Certificates)
foreach (var certificate in _options.TlsOptions.Certificates)
{
certificates.Add(new X509Certificate2(certificate));
}
@@ -212,7 +210,6 @@ namespace MQTTnet.Implementations
stream = new NetworkStream(_socket, true);
}
//todo: if branch can be used with min dependency NetStandard1.6
#if NET452 || NET461
SendStream = new BufferedStream(stream, _bufferSize);
ReceiveStream = new BufferedStream(stream, _bufferSize);


+ 5
- 0
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs 查看文件

@@ -24,6 +24,11 @@ namespace MQTTnet.Serializer

public static async Task<MqttPacketHeader> ReadHeaderAsync(Stream stream, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return null;
}

// Wait for the next package which starts with the header. At this point there will probably
// some large delay and thus the thread should be put back to the pool (await). So ReadByte()
// is not an option here.


+ 11
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs 查看文件

@@ -16,6 +16,8 @@ namespace MQTTnet.Server
private readonly Func<Task> _timeoutCallback;
private readonly IMqttNetLogger _logger;

private Task _workerTask;

public MqttClientKeepAliveMonitor(string clientId, Func<Task> timeoutCallback, IMqttNetLogger logger)
{
_clientId = clientId;
@@ -34,7 +36,15 @@ namespace MQTTnet.Server
return;
}

Task.Run(async () => await RunAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
_workerTask = Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false), cancellationToken);
}

public void WaitForCompletion()
{
if (_workerTask != null)
{
Task.WaitAll(_workerTask);
}
}

private async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)


+ 23
- 4
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs 查看文件

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
@@ -18,6 +19,8 @@ namespace MQTTnet.Server
private readonly MqttClientSession _clientSession;
private readonly IMqttNetLogger _logger;

private Task _workerTask;

public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
@@ -36,7 +39,15 @@ namespace MQTTnet.Server
return;
}

Task.Run(async () => await SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
_workerTask = Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken);
}

public void WaitForCompletion()
{
if (_workerTask != null)
{
Task.WaitAll(_workerTask);
}
}

public void Enqueue(MqttBasePacket packet)
@@ -55,7 +66,7 @@ namespace MQTTnet.Server
{
while (!cancellationToken.IsCancellationRequested)
{
await SendQueuedPacketAsync(adapter, cancellationToken);
await SendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -67,7 +78,7 @@ namespace MQTTnet.Server
}
}

private async Task SendQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
private async Task SendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
MqttBasePacket packet = null;
try
@@ -78,6 +89,11 @@ namespace MQTTnet.Server
throw new InvalidOperationException(); // should not happen
}

if (cancellationToken.IsCancellationRequested)
{
return;
}

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);

_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
@@ -110,7 +126,10 @@ namespace MQTTnet.Server
}
}

await _clientSession.StopAsync();
if (!cancellationToken.IsCancellationRequested)
{
await _clientSession.StopAsync().ConfigureAwait(false);
}
}
}



+ 5
- 5
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs 查看文件

@@ -96,14 +96,14 @@ namespace MQTTnet.Server
}

_cancellationTokenSource?.Cancel(false);

PendingMessagesQueue.WaitForCompletion();
KeepAliveMonitor.WaitForCompletion();

_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
_adapter = null;
}
_adapter = null;

_logger.Info<MqttClientSession>("Client '{0}': Session stopped.", ClientId);
}


+ 26
- 14
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs 查看文件

@@ -33,16 +33,18 @@ namespace MQTTnet.Server
public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; }
public Action<string, string> ClientUnsubscribedTopicCallback { get; set; }
public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; }
public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
MqttClientSession clientSession = null;
try
{
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket))
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken)
.ConfigureAwait(false) is MqttConnectPacket connectPacket))
{
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
throw new MqttProtocolViolationException(
"The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
}

clientId = connectPacket.ClientId;
@@ -53,10 +55,13 @@ namespace MQTTnet.Server
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
{
ConnectReturnCode = connectReturnCode
}}).ConfigureAwait(false);
new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}
}).ConfigureAwait(false);

return;
}
@@ -64,11 +69,14 @@ namespace MQTTnet.Server
var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
clientSession = result.Session;

await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[]
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = result.IsExistingSession
}}).ConfigureAwait(false);
new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = result.IsExistingSession
}
}).ConfigureAwait(false);

ClientConnectedCallback?.Invoke(new ConnectedMqttClient
{
@@ -78,6 +86,9 @@ namespace MQTTnet.Server

await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error<MqttClientSessionsManager>(exception, exception.Message);
@@ -87,12 +98,13 @@ namespace MQTTnet.Server
try
{
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
clientAdapter.Dispose();
}
catch (Exception)
catch (Exception exception)
{
// ignored
_logger.Error<MqttClientSessionsManager>(exception, exception.Message);
}
ClientDisconnectedCallback?.Invoke(new ConnectedMqttClient
{
ClientId = clientId,
@@ -308,7 +320,7 @@ namespace MQTTnet.Server
ClientSubscribedTopicCallback = null;
ClientUnsubscribedTopicCallback = null;
ApplicationMessageReceivedCallback = null;
_semaphore?.Dispose();
}
}

+ 4
- 0
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs 查看文件

@@ -17,6 +17,10 @@ namespace MQTTnet.Core.Tests

public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();

public void Dispose()
{
}

public Task ConnectAsync(TimeSpan timeout)
{
return Task.FromResult(0);


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs 查看文件

@@ -89,7 +89,7 @@ namespace MQTTnet.TestApp.NetCore
c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
}
}
catch (Exception e)
catch (Exception)
{
}
};


Loading…
取消
儲存