From 63991725824e3b3ccc5b5490c12786955cdcfd4c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Mon, 22 Apr 2019 10:50:47 +0200 Subject: [PATCH] Improve connection disposal. --- .../ApplicationBuilderExtensions.cs | 2 +- .../MqttConnectionContext.cs | 14 ++--- .../MqttConnectionHandler.cs | 11 ++-- .../MqttWebSocketServerAdapter.cs | 21 +++---- .../MQTTnet.AspnetCore/TopicFilterBuilder.cs | 51 ++++++++++++++++ .../Configuration/SettingsModel.cs | 3 + .../Configuration/TcpEndpointModel.cs | 20 +++--- .../MQTTnet.Server/Mqtt/MqttServerService.cs | 1 + Source/MQTTnet.Server/appsettings.json | 10 +-- Source/MQTTnet/Adapter/IMqttServerAdapter.cs | 2 +- ...qttServerAdapterClientAcceptedEventArgs.cs | 17 ------ .../MqttTcpServerAdapter.Uwp.cs | 43 +++++++++---- .../Implementations/MqttTcpServerAdapter.cs | 16 +++-- .../Implementations/MqttTcpServerListener.cs | 32 +++++++--- .../Implementations/MqttWebSocketChannel.cs | 1 - Source/MQTTnet/Server/MqttClientConnection.cs | 11 +--- .../Server/MqttClientSessionsManager.cs | 10 +-- Source/MQTTnet/Server/MqttServer.cs | 8 +-- .../MqttTcpChannelBenchmark.cs | 10 ++- .../Mockups/TestMqttServerAdapter.cs | 61 ------------------- 20 files changed, 179 insertions(+), 165 deletions(-) create mode 100644 Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs delete mode 100644 Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs delete mode 100644 Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs diff --git a/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs b/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs index 4c6a45f..94fc8e4 100644 --- a/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs @@ -28,7 +28,7 @@ namespace MQTTnet.AspNetCore } var adapter = app.ApplicationServices.GetRequiredService(); - using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol)) + using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol).ConfigureAwait(false)) { await adapter.RunWebSocketConnectionAsync(webSocket, context); } diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index 20ff314..6febcba 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -24,8 +24,7 @@ namespace MQTTnet.AspNetCore _input = Connection.Transport.Input; _output = Connection.Transport.Output; } - - + _reader = new SpanBasedMqttPacketBodyReader(); } @@ -33,9 +32,10 @@ namespace MQTTnet.AspNetCore private PipeWriter _output; private readonly SpanBasedMqttPacketBodyReader _reader; - public string Endpoint + public string Endpoint { - get { + get + { var connection = Http?.HttpContext?.Connection; if (connection == null) { @@ -53,12 +53,12 @@ namespace MQTTnet.AspNetCore public ConnectionContext Connection { get; } public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } - public long BytesSent { get; set; } + public long BytesSent { get; set; } public long BytesReceived { get; set; } public Action ReadingPacketStartedCallback { get; set; } public Action ReadingPacketCompletedCallback { get; set; } - + private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) @@ -145,7 +145,7 @@ namespace MQTTnet.AspNetCore public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) { var formatter = PacketFormatterAdapter; - + await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs index f3cb91a..8cc1022 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs @@ -10,7 +10,7 @@ namespace MQTTnet.AspNetCore { public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter { - public Action ClientAcceptedHandler { get; set; } + public Func ClientHandler { get; set; } public override async Task OnConnectedAsync(ConnectionContext connection) { @@ -25,10 +25,11 @@ namespace MQTTnet.AspNetCore var formatter = new MqttPacketFormatterAdapter(writer); using (var adapter = new MqttConnectionContext(formatter, connection)) { - var args = new MqttServerAdapterClientAcceptedEventArgs(adapter); - ClientAcceptedHandler?.Invoke(args); - - await args.SessionTask.ConfigureAwait(false); + var clientHandler = ClientHandler; + if (clientHandler != null) + { + await clientHandler(adapter).ConfigureAwait(false); + } } } diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 8cb0d6f..455840c 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -21,7 +21,7 @@ namespace MQTTnet.AspNetCore _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } - public Action ClientAcceptedHandler { get; set; } + public Func ClientHandler { get; set; } public Task StartAsync(IMqttServerOptions options) { @@ -43,17 +43,16 @@ namespace MQTTnet.AspNetCore var isSecureConnection = clientCertificate != null; clientCertificate?.Dispose(); - var writer = new SpanBasedMqttPacketWriter(); - var formatter = new MqttPacketFormatterAdapter(writer); - var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection); - var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter))); - - var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(channelAdapter); - ClientAcceptedHandler?.Invoke(eventArgs); - - if (eventArgs.SessionTask != null) + var clientHandler = ClientHandler; + if (clientHandler != null) { - await eventArgs.SessionTask.ConfigureAwait(false); + var writer = new SpanBasedMqttPacketWriter(); + var formatter = new MqttPacketFormatterAdapter(writer); + var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection); + using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter)))) + { + await clientHandler(channelAdapter).ConfigureAwait(false); + } } } diff --git a/Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs b/Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs new file mode 100644 index 0000000..a480263 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs @@ -0,0 +1,51 @@ +using MQTTnet.Exceptions; +using MQTTnet.Protocol; + +namespace MQTTnet +{ + public class TopicFilterBuilder + { + private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + private string _topic; + + public TopicFilterBuilder WithTopic(string topic) + { + _topic = topic; + return this; + } + + public TopicFilterBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) + { + _qualityOfServiceLevel = qualityOfServiceLevel; + return this; + } + + public TopicFilterBuilder WithAtLeastOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; + return this; + } + + public TopicFilterBuilder WithAtMostOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + return this; + } + + public TopicFilterBuilder WithExactlyOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; + return this; + } + + public TopicFilter Build() + { + if (string.IsNullOrEmpty(_topic)) + { + throw new MqttProtocolViolationException("Topic is not set."); + } + + return new TopicFilter { Topic = _topic, QualityOfServiceLevel = _qualityOfServiceLevel }; + } + } +} diff --git a/Source/MQTTnet.Server/Configuration/SettingsModel.cs b/Source/MQTTnet.Server/Configuration/SettingsModel.cs index 7aeab46..ce5336d 100644 --- a/Source/MQTTnet.Server/Configuration/SettingsModel.cs +++ b/Source/MQTTnet.Server/Configuration/SettingsModel.cs @@ -45,6 +45,9 @@ /// public RetainedApplicationMessagesModel RetainedApplicationMessages { get; set; } = new RetainedApplicationMessagesModel(); + /// + /// Enables or disables the MQTTnet internal logging. + /// public bool EnableDebugLogging { get; set; } = false; } } \ No newline at end of file diff --git a/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs b/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs index 0f006c2..8221390 100644 --- a/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs +++ b/Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs @@ -60,13 +60,13 @@ namespace MQTTnet.Server.Configuration { if (IPv4 == "*") { - address = IPAddress.Parse("0.0.0.0"); + address = IPAddress.Any; return true; } if (IPv4 == "localhost") { - address = IPAddress.Parse("127.0.0.1"); + address = IPAddress.Loopback; return true; } @@ -81,10 +81,8 @@ namespace MQTTnet.Server.Configuration address = ip; return true; } - else - { - throw new System.Exception($"Could not parse IPv4 address: {IPv4}"); - } + + throw new System.Exception($"Could not parse IPv4 address: {IPv4}"); } /// @@ -95,13 +93,13 @@ namespace MQTTnet.Server.Configuration { if (IPv6 == "*") { - address = IPAddress.Parse("::"); + address = IPAddress.IPv6Any; return true; } if (IPv6 == "localhost") { - address = IPAddress.Parse("::1"); + address = IPAddress.IPv6Loopback; return true; } @@ -116,10 +114,8 @@ namespace MQTTnet.Server.Configuration address = ip; return true; } - else - { - throw new System.Exception($"Could not parse IPv6 address: {IPv6}"); - } + + throw new System.Exception($"Could not parse IPv6 address: {IPv6}"); } } } \ No newline at end of file diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs index c25108a..7b796f6 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs @@ -152,6 +152,7 @@ namespace MQTTnet.Server.Mqtt if (_settings.TcpEndPoint.Enabled) { options.WithDefaultEndpoint(); + if (_settings.TcpEndPoint.TryReadIPv4(out var address4)) { options.WithDefaultEndpointBoundIPAddress(address4); diff --git a/Source/MQTTnet.Server/appsettings.json b/Source/MQTTnet.Server/appsettings.json index 1cf0bc4..2ee0eaf 100644 --- a/Source/MQTTnet.Server/appsettings.json +++ b/Source/MQTTnet.Server/appsettings.json @@ -18,14 +18,14 @@ */ "TcpEndPoint": { "Enabled": true, - "IPv4": "localhost", - "IPv6": "localhost", + "IPv4": "*", + "IPv6": "*", "Port": 1883 }, "EncryptedTcpEndPoint": { "Enabled": false, - "IPv4": "localhost", - "IPv6": "localhost", + "IPv4": "*", + "IPv6": "*", "Port": 8883, "CertificatePath": "/absolute/path/to/pfx" }, @@ -45,7 +45,7 @@ "Filename": "RetainedApplicationMessages.json", "WriteInterval": 10 // In seconds. }, - "EnableDebugLogging": false + "EnableDebugLogging": true }, "Logging": { "LogLevel": { diff --git a/Source/MQTTnet/Adapter/IMqttServerAdapter.cs b/Source/MQTTnet/Adapter/IMqttServerAdapter.cs index 2e0bfd1..52cda51 100644 --- a/Source/MQTTnet/Adapter/IMqttServerAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttServerAdapter.cs @@ -6,7 +6,7 @@ namespace MQTTnet.Adapter { public interface IMqttServerAdapter : IDisposable { - Action ClientAcceptedHandler { get; set; } + Func ClientHandler { get; set; } Task StartAsync(IMqttServerOptions options); Task StopAsync(); diff --git a/Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs b/Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs deleted file mode 100644 index 0c7db4a..0000000 --- a/Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace MQTTnet.Adapter -{ - public class MqttServerAdapterClientAcceptedEventArgs : EventArgs - { - public MqttServerAdapterClientAcceptedEventArgs(IMqttChannelAdapter channelAdapter) - { - ChannelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter)); - } - - public IMqttChannelAdapter ChannelAdapter { get; } - - public Task SessionTask { get; set; } - } -} diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs index 5ceba0e..4529075 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -23,7 +23,7 @@ namespace MQTTnet.Implementations _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } - public Action ClientAcceptedHandler { get; set; } + public Func ClientHandler { get; set; } public async Task StartAsync(IMqttServerOptions options) { @@ -39,7 +39,7 @@ namespace MQTTnet.Implementations _listener.Control.NoDelay = options.DefaultEndpointOptions.NoDelay; _listener.Control.KeepAlive = true; _listener.Control.QualityOfService = SocketQualityOfService.LowLatency; - _listener.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync; + _listener.ConnectionReceived += OnConnectionReceivedAsync; await _listener.BindServiceNameAsync(options.DefaultEndpointOptions.Port.ToString(), SocketProtectionLevel.PlainSocket); } @@ -54,30 +54,51 @@ namespace MQTTnet.Implementations { if (_listener != null) { - _listener.ConnectionReceived -= AcceptDefaultEndpointConnectionsAsync; + _listener.ConnectionReceived -= OnConnectionReceivedAsync; } - _listener?.Dispose(); - _listener = null; - return Task.FromResult(0); } public void Dispose() { - StopAsync().GetAwaiter().GetResult(); + _listener?.Dispose(); + _listener = null; } - private void AcceptDefaultEndpointConnectionsAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) + private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) { try { - var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger); - ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); + var clientHandler = ClientHandler; + if (clientHandler != null) + { + using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger)) + { + await clientHandler(clientAdapter).ConfigureAwait(false); + } + } } catch (Exception exception) { - _logger.Error(exception, "Error while accepting connection at default endpoint."); + if (exception is ObjectDisposedException) + { + // It can happen that the listener socket is accessed after the cancellation token is already set and the listener socket is disposed. + return; + } + + _logger.Error(exception, "Error while handling client connection."); + } + finally + { + try + { + args.Socket.Dispose(); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while cleaning up client connection"); + } } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 92ccb81..900ba96 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -26,7 +26,7 @@ namespace MQTTnet.Implementations _logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter)); } - public Action ClientAcceptedHandler { get; set; } + public Func ClientHandler { get; set; } public Task StartAsync(IMqttServerOptions options) { @@ -89,7 +89,7 @@ namespace MQTTnet.Implementations _cancellationTokenSource.Token, _logger) { - ClientAcceptedHandler = OnClientAccepted + ClientHandler = OnClientAcceptedAsync }; listenerV4.Start(); @@ -105,7 +105,7 @@ namespace MQTTnet.Implementations _cancellationTokenSource.Token, _logger) { - ClientAcceptedHandler = OnClientAccepted + ClientHandler = OnClientAcceptedAsync }; listenerV6.Start(); @@ -113,9 +113,15 @@ namespace MQTTnet.Implementations } } - private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs) + private Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter) { - ClientAcceptedHandler?.Invoke(eventArgs); + var clientHandler = ClientHandler; + if (clientHandler == null) + { + return Task.FromResult(0); + } + + return clientHandler(channelAdapter); } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index b459536..51112e8 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -43,7 +43,7 @@ namespace MQTTnet.Implementations } } - public Action ClientAcceptedHandler { get; set; } + public Func ClientHandler { get; set; } public void Start() { @@ -97,11 +97,14 @@ namespace MQTTnet.Implementations private async Task TryHandleClientConnectionAsync(Socket clientSocket) { Stream stream = null; + EndPoint remoteEndPoint = null; try { + remoteEndPoint = clientSocket.RemoteEndPoint; + _logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.", - clientSocket.RemoteEndPoint, + remoteEndPoint, _socket.LocalEndPoint, _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); @@ -116,8 +119,14 @@ namespace MQTTnet.Implementations stream = sslStream; } - var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream), new MqttPacketFormatterAdapter(), _logger); - ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); + var clientHandler = ClientHandler; + if (clientHandler != null) + { + using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream), new MqttPacketFormatterAdapter(), _logger)) + { + await clientHandler(clientAdapter).ConfigureAwait(false); + } + } } catch (Exception exception) { @@ -127,20 +136,29 @@ namespace MQTTnet.Implementations return; } - if (exception is SocketException socketException && socketException.SocketErrorCode == SocketError.OperationAborted) + if (exception is SocketException socketException && + socketException.SocketErrorCode == SocketError.OperationAborted) { return; } + _logger.Error(exception, "Error while handling client connection."); + } + finally + { try { - // Dispose already allocated resources. stream?.Dispose(); clientSocket?.Dispose(); + + _logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.", + remoteEndPoint, + _socket.LocalEndPoint, + _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); } catch (Exception disposeException) { - _logger.Error(disposeException, "Error while cleanup of broken connection."); + _logger.Error(disposeException, "Error while cleaning up client connection"); } } } diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index 59329a5..ccc601a 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -1,5 +1,4 @@ using System; -using System.Globalization; using System.Net; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 9b270c0..4d870d5 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -27,12 +27,12 @@ namespace MQTTnet.Server private readonly IMqttNetChildLogger _logger; private readonly IMqttServerOptions _serverOptions; - private Task _packageReceiverTask; private readonly IMqttChannelAdapter _channelAdapter; private readonly IMqttDataConverter _dataConverter; private readonly string _endpoint; private readonly MqttConnectPacket _connectPacket; + private Task _packageReceiverTask; private DateTime _lastPacketReceivedTimestamp; private DateTime _lastNonKeepAlivePacketReceivedTimestamp; @@ -78,7 +78,7 @@ namespace MQTTnet.Server StopInternal(); var task = _packageReceiverTask; - if (task != null && !task.IsCompleted) + if (task != null) { await task.ConfigureAwait(false); } @@ -103,13 +103,6 @@ namespace MQTTnet.Server status.BytesReceived = _channelAdapter.BytesReceived; } - //public void ClearPendingApplicationMessages() - //{ - // Session.ApplicationMessagesQueue.Clear(); - - // //_applicationMessagesQueue.Clear(); - //} - public void Dispose() { _cancellationToken.Dispose(); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index a6c8c5d..cd1aa1a 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -57,9 +57,9 @@ namespace MQTTnet.Server } } - public Task HandleConnectionAsync(IMqttChannelAdapter clientAdapter) + public Task HandleClientAsync(IMqttChannelAdapter clientAdapter) { - return HandleConnectionAsync(clientAdapter, _cancellationToken); + return HandleClientAsync(clientAdapter, _cancellationToken); } public Task> GetClientStatusAsync() @@ -221,7 +221,7 @@ namespace MQTTnet.Server } } - private async Task HandleConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) + private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { var disconnectType = MqttClientDisconnectType.NotClean; var clientId = string.Empty; @@ -376,10 +376,6 @@ namespace MQTTnet.Server { _logger.Error(exception, "Error while disconnecting client channel."); } - finally - { - channelAdapter.Dispose(); - } } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 06d6104..0055639 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -127,7 +127,7 @@ namespace MQTTnet.Server foreach (var adapter in _adapters) { - adapter.ClientAcceptedHandler = OnClientAccepted; + adapter.ClientHandler = OnHandleClient; await adapter.StartAsync(Options).ConfigureAwait(false); } @@ -155,7 +155,7 @@ namespace MQTTnet.Server foreach (var adapter in _adapters) { - adapter.ClientAcceptedHandler = null; + adapter.ClientHandler = null; await adapter.StopAsync().ConfigureAwait(false); } @@ -184,9 +184,9 @@ namespace MQTTnet.Server return _retainedMessagesManager?.ClearMessagesAsync(); } - private void OnClientAccepted(MqttServerAdapterClientAcceptedEventArgs eventArgs) + private Task OnHandleClient(IMqttChannelAdapter channelAdapter) { - eventArgs.SessionTask = _clientSessionsManager.HandleConnectionAsync(eventArgs.ChannelAdapter); + return _clientSessionsManager.HandleClientAsync(channelAdapter); } } } diff --git a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs index b8c54c4..04761e7 100644 --- a/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -21,8 +21,16 @@ namespace MQTTnet.Benchmarks { var factory = new MqttFactory(); var tcpServer = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); - tcpServer.ClientAcceptedHandler += args => _serverChannel = (IMqttChannel)args.ChannelAdapter.GetType().GetField("_channel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance).GetValue(args.ChannelAdapter); + tcpServer.ClientHandler += args => + { + _serverChannel = + (IMqttChannel)args.GetType().GetField("_channel", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance) + .GetValue(args); + return Task.CompletedTask; + }; + _mqttServer = factory.CreateMqttServer(new[] { tcpServer }, new MqttNetLogger()); var serverOptions = new MqttServerOptionsBuilder().Build(); diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs deleted file mode 100644 index b2970fb..0000000 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttServerAdapter.cs +++ /dev/null @@ -1,61 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Adapter; -using MQTTnet.Client; -using MQTTnet.Client.Options; -using MQTTnet.Diagnostics; -using MQTTnet.Server; - -namespace MQTTnet.Tests.Mockups -{ - public class TestMqttServerAdapter : IMqttServerAdapter - { - public Action ClientAcceptedHandler { get; set; } - - public async Task ConnectTestClient(string clientId, MqttApplicationMessage willMessage = null) - { - var adapterA = new TestMqttCommunicationAdapter(); - var adapterB = new TestMqttCommunicationAdapter(); - adapterA.Partner = adapterB; - adapterB.Partner = adapterA; - - var client = new MqttClient( - new TestMqttCommunicationAdapterFactory(adapterA), - new MqttNetLogger()); - - FireClientAcceptedEvent(adapterB); - - var options = new MqttClientOptions - { - ClientId = clientId, - WillMessage = willMessage, - ChannelOptions = new MqttClientTcpOptions() - }; - - await client.ConnectAsync(options); - SpinWait.SpinUntil(() => client.IsConnected); - - return client; - } - - private void FireClientAcceptedEvent(IMqttChannelAdapter adapter) - { - ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(adapter)); - } - - public Task StartAsync(IMqttServerOptions options) - { - return Task.FromResult(0); - } - - public Task StopAsync() - { - return Task.FromResult(0); - } - - public void Dispose() - { - } - } -} \ No newline at end of file