Browse Source

Improve connection disposal.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
6399172582
20 changed files with 179 additions and 165 deletions
  1. +1
    -1
      Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs
  2. +7
    -7
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  3. +6
    -5
      Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
  4. +10
    -11
      Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  5. +51
    -0
      Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs
  6. +3
    -0
      Source/MQTTnet.Server/Configuration/SettingsModel.cs
  7. +8
    -12
      Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs
  8. +1
    -0
      Source/MQTTnet.Server/Mqtt/MqttServerService.cs
  9. +5
    -5
      Source/MQTTnet.Server/appsettings.json
  10. +1
    -1
      Source/MQTTnet/Adapter/IMqttServerAdapter.cs
  11. +0
    -17
      Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs
  12. +32
    -11
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs
  13. +11
    -5
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  14. +25
    -7
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  15. +0
    -1
      Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
  16. +2
    -9
      Source/MQTTnet/Server/MqttClientConnection.cs
  17. +3
    -7
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  18. +4
    -4
      Source/MQTTnet/Server/MqttServer.cs
  19. +9
    -1
      Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
  20. +0
    -61
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs

+ 1
- 1
Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs View File

@@ -28,7 +28,7 @@ namespace MQTTnet.AspNetCore
} }


var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>(); var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>();
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol))
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol).ConfigureAwait(false))
{ {
await adapter.RunWebSocketConnectionAsync(webSocket, context); await adapter.RunWebSocketConnectionAsync(webSocket, context);
} }


+ 7
- 7
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -24,8 +24,7 @@ namespace MQTTnet.AspNetCore
_input = Connection.Transport.Input; _input = Connection.Transport.Input;
_output = Connection.Transport.Output; _output = Connection.Transport.Output;
} }


_reader = new SpanBasedMqttPacketBodyReader(); _reader = new SpanBasedMqttPacketBodyReader();
} }


@@ -33,9 +32,10 @@ namespace MQTTnet.AspNetCore
private PipeWriter _output; private PipeWriter _output;
private readonly SpanBasedMqttPacketBodyReader _reader; private readonly SpanBasedMqttPacketBodyReader _reader;


public string Endpoint
public string Endpoint
{ {
get {
get
{
var connection = Http?.HttpContext?.Connection; var connection = Http?.HttpContext?.Connection;
if (connection == null) if (connection == null)
{ {
@@ -53,12 +53,12 @@ namespace MQTTnet.AspNetCore
public ConnectionContext Connection { get; } public ConnectionContext Connection { get; }
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }


public long BytesSent { get; set; }
public long BytesSent { get; set; }
public long BytesReceived { get; set; } public long BytesReceived { get; set; }


public Action ReadingPacketStartedCallback { get; set; } public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; } public Action ReadingPacketCompletedCallback { get; set; }
private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);


public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
@@ -145,7 +145,7 @@ namespace MQTTnet.AspNetCore
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{ {
var formatter = PacketFormatterAdapter; var formatter = PacketFormatterAdapter;


await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try try


+ 6
- 5
Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs View File

@@ -10,7 +10,7 @@ namespace MQTTnet.AspNetCore
{ {
public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter
{ {
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


public override async Task OnConnectedAsync(ConnectionContext connection) public override async Task OnConnectedAsync(ConnectionContext connection)
{ {
@@ -25,10 +25,11 @@ namespace MQTTnet.AspNetCore
var formatter = new MqttPacketFormatterAdapter(writer); var formatter = new MqttPacketFormatterAdapter(writer);
using (var adapter = new MqttConnectionContext(formatter, connection)) using (var adapter = new MqttConnectionContext(formatter, connection))
{ {
var args = new MqttServerAdapterClientAcceptedEventArgs(adapter);
ClientAcceptedHandler?.Invoke(args);

await args.SessionTask.ConfigureAwait(false);
var clientHandler = ClientHandler;
if (clientHandler != null)
{
await clientHandler(adapter).ConfigureAwait(false);
}
} }
} }




+ 10
- 11
Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -21,7 +21,7 @@ namespace MQTTnet.AspNetCore
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
} }


public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


public Task StartAsync(IMqttServerOptions options) public Task StartAsync(IMqttServerOptions options)
{ {
@@ -43,17 +43,16 @@ namespace MQTTnet.AspNetCore
var isSecureConnection = clientCertificate != null; var isSecureConnection = clientCertificate != null;
clientCertificate?.Dispose(); clientCertificate?.Dispose();


var writer = new SpanBasedMqttPacketWriter();
var formatter = new MqttPacketFormatterAdapter(writer);
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection);
var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter)));

var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(channelAdapter);
ClientAcceptedHandler?.Invoke(eventArgs);

if (eventArgs.SessionTask != null)
var clientHandler = ClientHandler;
if (clientHandler != null)
{ {
await eventArgs.SessionTask.ConfigureAwait(false);
var writer = new SpanBasedMqttPacketWriter();
var formatter = new MqttPacketFormatterAdapter(writer);
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection);
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter))))
{
await clientHandler(channelAdapter).ConfigureAwait(false);
}
} }
} }




+ 51
- 0
Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs View File

@@ -0,0 +1,51 @@
using MQTTnet.Exceptions;
using MQTTnet.Protocol;

namespace MQTTnet
{
public class TopicFilterBuilder
{
private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
private string _topic;

public TopicFilterBuilder WithTopic(string topic)
{
_topic = topic;
return this;
}

public TopicFilterBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
_qualityOfServiceLevel = qualityOfServiceLevel;
return this;
}

public TopicFilterBuilder WithAtLeastOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
return this;
}

public TopicFilterBuilder WithAtMostOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
return this;
}

public TopicFilterBuilder WithExactlyOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
return this;
}

public TopicFilter Build()
{
if (string.IsNullOrEmpty(_topic))
{
throw new MqttProtocolViolationException("Topic is not set.");
}

return new TopicFilter { Topic = _topic, QualityOfServiceLevel = _qualityOfServiceLevel };
}
}
}

+ 3
- 0
Source/MQTTnet.Server/Configuration/SettingsModel.cs View File

@@ -45,6 +45,9 @@
/// </summary> /// </summary>
public RetainedApplicationMessagesModel RetainedApplicationMessages { get; set; } = new RetainedApplicationMessagesModel(); public RetainedApplicationMessagesModel RetainedApplicationMessages { get; set; } = new RetainedApplicationMessagesModel();


/// <summary>
/// Enables or disables the MQTTnet internal logging.
/// </summary>
public bool EnableDebugLogging { get; set; } = false; public bool EnableDebugLogging { get; set; } = false;
} }
} }

+ 8
- 12
Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs View File

@@ -60,13 +60,13 @@ namespace MQTTnet.Server.Configuration
{ {
if (IPv4 == "*") if (IPv4 == "*")
{ {
address = IPAddress.Parse("0.0.0.0");
address = IPAddress.Any;
return true; return true;
} }


if (IPv4 == "localhost") if (IPv4 == "localhost")
{ {
address = IPAddress.Parse("127.0.0.1");
address = IPAddress.Loopback;
return true; return true;
} }


@@ -81,10 +81,8 @@ namespace MQTTnet.Server.Configuration
address = ip; address = ip;
return true; return true;
} }
else
{
throw new System.Exception($"Could not parse IPv4 address: {IPv4}");
}

throw new System.Exception($"Could not parse IPv4 address: {IPv4}");
} }


/// <summary> /// <summary>
@@ -95,13 +93,13 @@ namespace MQTTnet.Server.Configuration
{ {
if (IPv6 == "*") if (IPv6 == "*")
{ {
address = IPAddress.Parse("::");
address = IPAddress.IPv6Any;
return true; return true;
} }


if (IPv6 == "localhost") if (IPv6 == "localhost")
{ {
address = IPAddress.Parse("::1");
address = IPAddress.IPv6Loopback;
return true; return true;
} }


@@ -116,10 +114,8 @@ namespace MQTTnet.Server.Configuration
address = ip; address = ip;
return true; return true;
} }
else
{
throw new System.Exception($"Could not parse IPv6 address: {IPv6}");
}

throw new System.Exception($"Could not parse IPv6 address: {IPv6}");
} }
} }
} }

+ 1
- 0
Source/MQTTnet.Server/Mqtt/MqttServerService.cs View File

@@ -152,6 +152,7 @@ namespace MQTTnet.Server.Mqtt
if (_settings.TcpEndPoint.Enabled) if (_settings.TcpEndPoint.Enabled)
{ {
options.WithDefaultEndpoint(); options.WithDefaultEndpoint();

if (_settings.TcpEndPoint.TryReadIPv4(out var address4)) if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
{ {
options.WithDefaultEndpointBoundIPAddress(address4); options.WithDefaultEndpointBoundIPAddress(address4);


+ 5
- 5
Source/MQTTnet.Server/appsettings.json View File

@@ -18,14 +18,14 @@
*/ */
"TcpEndPoint": { "TcpEndPoint": {
"Enabled": true, "Enabled": true,
"IPv4": "localhost",
"IPv6": "localhost",
"IPv4": "*",
"IPv6": "*",
"Port": 1883 "Port": 1883
}, },
"EncryptedTcpEndPoint": { "EncryptedTcpEndPoint": {
"Enabled": false, "Enabled": false,
"IPv4": "localhost",
"IPv6": "localhost",
"IPv4": "*",
"IPv6": "*",
"Port": 8883, "Port": 8883,
"CertificatePath": "/absolute/path/to/pfx" "CertificatePath": "/absolute/path/to/pfx"
}, },
@@ -45,7 +45,7 @@
"Filename": "RetainedApplicationMessages.json", "Filename": "RetainedApplicationMessages.json",
"WriteInterval": 10 // In seconds. "WriteInterval": 10 // In seconds.
}, },
"EnableDebugLogging": false
"EnableDebugLogging": true
}, },
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {


+ 1
- 1
Source/MQTTnet/Adapter/IMqttServerAdapter.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Adapter
{ {
public interface IMqttServerAdapter : IDisposable public interface IMqttServerAdapter : IDisposable
{ {
Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


Task StartAsync(IMqttServerOptions options); Task StartAsync(IMqttServerOptions options);
Task StopAsync(); Task StopAsync();


+ 0
- 17
Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs View File

@@ -1,17 +0,0 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Adapter
{
public class MqttServerAdapterClientAcceptedEventArgs : EventArgs
{
public MqttServerAdapterClientAcceptedEventArgs(IMqttChannelAdapter channelAdapter)
{
ChannelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
}

public IMqttChannelAdapter ChannelAdapter { get; }

public Task SessionTask { get; set; }
}
}

+ 32
- 11
Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs View File

@@ -23,7 +23,7 @@ namespace MQTTnet.Implementations
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
} }


public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


public async Task StartAsync(IMqttServerOptions options) public async Task StartAsync(IMqttServerOptions options)
{ {
@@ -39,7 +39,7 @@ namespace MQTTnet.Implementations
_listener.Control.NoDelay = options.DefaultEndpointOptions.NoDelay; _listener.Control.NoDelay = options.DefaultEndpointOptions.NoDelay;
_listener.Control.KeepAlive = true; _listener.Control.KeepAlive = true;
_listener.Control.QualityOfService = SocketQualityOfService.LowLatency; _listener.Control.QualityOfService = SocketQualityOfService.LowLatency;
_listener.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync;
_listener.ConnectionReceived += OnConnectionReceivedAsync;
await _listener.BindServiceNameAsync(options.DefaultEndpointOptions.Port.ToString(), SocketProtectionLevel.PlainSocket); await _listener.BindServiceNameAsync(options.DefaultEndpointOptions.Port.ToString(), SocketProtectionLevel.PlainSocket);
} }
@@ -54,30 +54,51 @@ namespace MQTTnet.Implementations
{ {
if (_listener != null) if (_listener != null)
{ {
_listener.ConnectionReceived -= AcceptDefaultEndpointConnectionsAsync;
_listener.ConnectionReceived -= OnConnectionReceivedAsync;
} }


_listener?.Dispose();
_listener = null;

return Task.FromResult(0); return Task.FromResult(0);
} }


public void Dispose() public void Dispose()
{ {
StopAsync().GetAwaiter().GetResult();
_listener?.Dispose();
_listener = null;
} }


private void AcceptDefaultEndpointConnectionsAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
{ {
try try
{ {
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger);
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
var clientHandler = ClientHandler;
if (clientHandler != null)
{
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger))
{
await clientHandler(clientAdapter).ConfigureAwait(false);
}
}
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error(exception, "Error while accepting connection at default endpoint.");
if (exception is ObjectDisposedException)
{
// It can happen that the listener socket is accessed after the cancellation token is already set and the listener socket is disposed.
return;
}

_logger.Error(exception, "Error while handling client connection.");
}
finally
{
try
{
args.Socket.Dispose();
}
catch (Exception exception)
{
_logger.Error(exception, "Error while cleaning up client connection");
}
} }
} }
} }


+ 11
- 5
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs View File

@@ -26,7 +26,7 @@ namespace MQTTnet.Implementations
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
} }


public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


public Task StartAsync(IMqttServerOptions options) public Task StartAsync(IMqttServerOptions options)
{ {
@@ -89,7 +89,7 @@ namespace MQTTnet.Implementations
_cancellationTokenSource.Token, _cancellationTokenSource.Token,
_logger) _logger)
{ {
ClientAcceptedHandler = OnClientAccepted
ClientHandler = OnClientAcceptedAsync
}; };


listenerV4.Start(); listenerV4.Start();
@@ -105,7 +105,7 @@ namespace MQTTnet.Implementations
_cancellationTokenSource.Token, _cancellationTokenSource.Token,
_logger) _logger)
{ {
ClientAcceptedHandler = OnClientAccepted
ClientHandler = OnClientAcceptedAsync
}; };


listenerV6.Start(); listenerV6.Start();
@@ -113,9 +113,15 @@ namespace MQTTnet.Implementations
} }
} }


private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs)
private Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter)
{ {
ClientAcceptedHandler?.Invoke(eventArgs);
var clientHandler = ClientHandler;
if (clientHandler == null)
{
return Task.FromResult(0);
}

return clientHandler(channelAdapter);
} }
} }
} }

+ 25
- 7
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -43,7 +43,7 @@ namespace MQTTnet.Implementations
} }
} }


public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }


public void Start() public void Start()
{ {
@@ -97,11 +97,14 @@ namespace MQTTnet.Implementations
private async Task TryHandleClientConnectionAsync(Socket clientSocket) private async Task TryHandleClientConnectionAsync(Socket clientSocket)
{ {
Stream stream = null; Stream stream = null;
EndPoint remoteEndPoint = null;


try try
{ {
remoteEndPoint = clientSocket.RemoteEndPoint;

_logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.", _logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.",
clientSocket.RemoteEndPoint,
remoteEndPoint,
_socket.LocalEndPoint, _socket.LocalEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");


@@ -116,8 +119,14 @@ namespace MQTTnet.Implementations
stream = sslStream; stream = sslStream;
} }


var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream), new MqttPacketFormatterAdapter(), _logger);
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
var clientHandler = ClientHandler;
if (clientHandler != null)
{
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream), new MqttPacketFormatterAdapter(), _logger))
{
await clientHandler(clientAdapter).ConfigureAwait(false);
}
}
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -127,20 +136,29 @@ namespace MQTTnet.Implementations
return; return;
} }


if (exception is SocketException socketException && socketException.SocketErrorCode == SocketError.OperationAborted)
if (exception is SocketException socketException &&
socketException.SocketErrorCode == SocketError.OperationAborted)
{ {
return; return;
} }


_logger.Error(exception, "Error while handling client connection.");
}
finally
{
try try
{ {
// Dispose already allocated resources.
stream?.Dispose(); stream?.Dispose();
clientSocket?.Dispose(); clientSocket?.Dispose();

_logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.",
remoteEndPoint,
_socket.LocalEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");
} }
catch (Exception disposeException) catch (Exception disposeException)
{ {
_logger.Error(disposeException, "Error while cleanup of broken connection.");
_logger.Error(disposeException, "Error while cleaning up client connection");
} }
} }
} }


+ 0
- 1
Source/MQTTnet/Implementations/MqttWebSocketChannel.cs View File

@@ -1,5 +1,4 @@
using System; using System;
using System.Globalization;
using System.Net; using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;


+ 2
- 9
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -27,12 +27,12 @@ namespace MQTTnet.Server
private readonly IMqttNetChildLogger _logger; private readonly IMqttNetChildLogger _logger;
private readonly IMqttServerOptions _serverOptions; private readonly IMqttServerOptions _serverOptions;


private Task<MqttClientDisconnectType> _packageReceiverTask;
private readonly IMqttChannelAdapter _channelAdapter; private readonly IMqttChannelAdapter _channelAdapter;
private readonly IMqttDataConverter _dataConverter; private readonly IMqttDataConverter _dataConverter;
private readonly string _endpoint; private readonly string _endpoint;
private readonly MqttConnectPacket _connectPacket; private readonly MqttConnectPacket _connectPacket;


private Task<MqttClientDisconnectType> _packageReceiverTask;
private DateTime _lastPacketReceivedTimestamp; private DateTime _lastPacketReceivedTimestamp;
private DateTime _lastNonKeepAlivePacketReceivedTimestamp; private DateTime _lastNonKeepAlivePacketReceivedTimestamp;


@@ -78,7 +78,7 @@ namespace MQTTnet.Server
StopInternal(); StopInternal();


var task = _packageReceiverTask; var task = _packageReceiverTask;
if (task != null && !task.IsCompleted)
if (task != null)
{ {
await task.ConfigureAwait(false); await task.ConfigureAwait(false);
} }
@@ -103,13 +103,6 @@ namespace MQTTnet.Server
status.BytesReceived = _channelAdapter.BytesReceived; status.BytesReceived = _channelAdapter.BytesReceived;
} }
//public void ClearPendingApplicationMessages()
//{
// Session.ApplicationMessagesQueue.Clear();

// //_applicationMessagesQueue.Clear();
//}

public void Dispose() public void Dispose()
{ {
_cancellationToken.Dispose(); _cancellationToken.Dispose();


+ 3
- 7
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -57,9 +57,9 @@ namespace MQTTnet.Server
} }
} }


public Task HandleConnectionAsync(IMqttChannelAdapter clientAdapter)
public Task HandleClientAsync(IMqttChannelAdapter clientAdapter)
{ {
return HandleConnectionAsync(clientAdapter, _cancellationToken);
return HandleClientAsync(clientAdapter, _cancellationToken);
} }


public Task<IList<IMqttClientStatus>> GetClientStatusAsync() public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
@@ -221,7 +221,7 @@ namespace MQTTnet.Server
} }
} }


private async Task HandleConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{ {
var disconnectType = MqttClientDisconnectType.NotClean; var disconnectType = MqttClientDisconnectType.NotClean;
var clientId = string.Empty; var clientId = string.Empty;
@@ -376,10 +376,6 @@ namespace MQTTnet.Server
{ {
_logger.Error(exception, "Error while disconnecting client channel."); _logger.Error(exception, "Error while disconnecting client channel.");
} }
finally
{
channelAdapter.Dispose();
}
} }
} }
} }

+ 4
- 4
Source/MQTTnet/Server/MqttServer.cs View File

@@ -127,7 +127,7 @@ namespace MQTTnet.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientAcceptedHandler = OnClientAccepted;
adapter.ClientHandler = OnHandleClient;
await adapter.StartAsync(Options).ConfigureAwait(false); await adapter.StartAsync(Options).ConfigureAwait(false);
} }


@@ -155,7 +155,7 @@ namespace MQTTnet.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientAcceptedHandler = null;
adapter.ClientHandler = null;
await adapter.StopAsync().ConfigureAwait(false); await adapter.StopAsync().ConfigureAwait(false);
} }


@@ -184,9 +184,9 @@ namespace MQTTnet.Server
return _retainedMessagesManager?.ClearMessagesAsync(); return _retainedMessagesManager?.ClearMessagesAsync();
} }


private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs)
private Task OnHandleClient(IMqttChannelAdapter channelAdapter)
{ {
eventArgs.SessionTask = _clientSessionsManager.HandleConnectionAsync(eventArgs.ChannelAdapter);
return _clientSessionsManager.HandleClientAsync(channelAdapter);
} }
} }
} }

+ 9
- 1
Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs View File

@@ -21,8 +21,16 @@ namespace MQTTnet.Benchmarks
{ {
var factory = new MqttFactory(); var factory = new MqttFactory();
var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
tcpServer.ClientAcceptedHandler += args => _serverChannel = (IMqttChannel)args.ChannelAdapter.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.ChannelAdapter);
tcpServer.ClientHandler += args =>
{
_serverChannel =
(IMqttChannel)args.GetType().GetField("_channel",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)
.GetValue(args);


return Task.CompletedTask;
};
_mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger());


var serverOptions = new MqttServerOptionsBuilder().Build(); var serverOptions = new MqttServerOptionsBuilder().Build();


+ 0
- 61
Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs View File

@@ -1,61 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Diagnostics;
using MQTTnet.Server;

namespace MQTTnet.Tests.Mockups
{
public class TestMqttServerAdapter : IMqttServerAdapter
{
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public async Task<IMqttClient> ConnectTestClient(string clientId, MqttApplicationMessage willMessage = null)
{
var adapterA = new TestMqttCommunicationAdapter();
var adapterB = new TestMqttCommunicationAdapter();
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;

var client = new MqttClient(
new TestMqttCommunicationAdapterFactory(adapterA),
new MqttNetLogger());

FireClientAcceptedEvent(adapterB);

var options = new MqttClientOptions
{
ClientId = clientId,
WillMessage = willMessage,
ChannelOptions = new MqttClientTcpOptions()
};

await client.ConnectAsync(options);
SpinWait.SpinUntil(() => client.IsConnected);

return client;
}
private void FireClientAcceptedEvent(IMqttChannelAdapter adapter)
{
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(adapter));
}

public Task StartAsync(IMqttServerOptions options)
{
return Task.FromResult(0);
}

public Task StopAsync()
{
return Task.FromResult(0);
}

public void Dispose()
{
}
}
}

Loading…
Cancel
Save