diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index c0ab6a6..159137c 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public Task StartAsync(MqttServerOptions options) { @@ -93,7 +93,7 @@ namespace MQTTnet.Implementations var tcpChannel = new MqttTcpChannel(clientSocket, null); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) when (!(exception is ObjectDisposedException)) { @@ -118,7 +118,7 @@ namespace MQTTnet.Implementations var tcpChannel = new MqttTcpChannel(clientSocket, sslStream); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 325f622..6a90109 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public Task StartAsync(MqttServerOptions options) { @@ -90,7 +90,7 @@ namespace MQTTnet.Implementations { var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { @@ -114,7 +114,7 @@ namespace MQTTnet.Implementations await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index 3c1300c..7206f54 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public async Task StartAsync(MqttServerOptions options) { @@ -57,7 +57,7 @@ namespace MQTTnet.Implementations try { var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/MQTTnet.Core/Adapter/IMqttServerAdapter.cs b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs index 416139a..b1546ef 100644 --- a/MQTTnet.Core/Adapter/IMqttServerAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs @@ -6,7 +6,7 @@ namespace MQTTnet.Core.Adapter { public interface IMqttServerAdapter { - event EventHandler ClientConnected; + event Action ClientAccepted; Task StartAsync(MqttServerOptions options); Task StopAsync(); diff --git a/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs index fc0fd8b..ade4d9d 100644 --- a/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs @@ -1,17 +1,15 @@ -using System; +using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Adapter { public class MqttClientConnectedEventArgs : EventArgs { - public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter) + public MqttClientConnectedEventArgs(ConnectedMqttClient client) { - Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier)); - ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + Client = client ?? throw new ArgumentNullException(nameof(client)); } - public string Identifier { get; } - - public IMqttCommunicationAdapter ClientAdapter { get; } + public ConnectedMqttClient Client { get; } } } diff --git a/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs b/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs index 48d49c1..8316d40 100644 --- a/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs @@ -1,17 +1,15 @@ -using System; +using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Adapter { public class MqttClientDisconnectedEventArgs : EventArgs { - public MqttClientDisconnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter) + public MqttClientDisconnectedEventArgs(ConnectedMqttClient client) { - Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier)); - ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + Client = client ?? throw new ArgumentNullException(nameof(client)); } - - public string Identifier { get; } - - public IMqttCommunicationAdapter ClientAdapter { get; } + + public ConnectedMqttClient Client { get; } } } diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index c054fed..cbe7582 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Server event EventHandler ClientDisconnected; IList GetConnectedClients(); - void InjectClient(string identifier, IMqttCommunicationAdapter adapter); + void InjectClient(IMqttCommunicationAdapter adapter); void Publish(MqttApplicationMessage applicationMessage); Task StartAsync(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 2115508..315f991 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -19,8 +19,7 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; - - private string _identifier; + private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; @@ -38,7 +37,7 @@ namespace MQTTnet.Core.Server public IMqttCommunicationAdapter Adapter { get; private set; } - public async Task RunAsync(string identifier, MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) + public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -46,7 +45,6 @@ namespace MQTTnet.Core.Server try { - _identifier = identifier; Adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); @@ -58,11 +56,11 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier); + MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier); + MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } } @@ -79,7 +77,7 @@ namespace MQTTnet.Core.Server Adapter = null; - MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", _identifier); + MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); } public void EnqueuePublishPacket(MqttPublishPacket publishPacket) @@ -92,7 +90,7 @@ namespace MQTTnet.Core.Server } _pendingMessagesQueue.Enqueue(publishPacket); - MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier); + MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); } public void Dispose() @@ -116,12 +114,12 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier); + MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); Stop(); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier); + MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); Stop(); } } @@ -163,7 +161,7 @@ namespace MQTTnet.Core.Server } else { - MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet); + MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); _cancellationTokenSource.Cancel(); } } @@ -181,7 +179,7 @@ namespace MQTTnet.Core.Server { if (publishPacket.Retain) { - await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(_identifier, publishPacket); + await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 71502ac..599d168 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -25,16 +25,18 @@ namespace MQTTnet.Core.Server public event EventHandler ApplicationMessageReceived; + public event EventHandler ClientConnected; + public event EventHandler ClientDisconnected; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } - public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) + public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) { var clientId = string.Empty; try { - if (!(await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) + if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } @@ -42,12 +44,12 @@ namespace MQTTnet.Core.Server clientId = connectPacket.ClientId; // Switch to the required protocol version before sending any response. - eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; + clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode }).ConfigureAwait(false); @@ -57,13 +59,19 @@ namespace MQTTnet.Core.Server var clientSession = GetOrCreateClientSession(connectPacket); - await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession }).ConfigureAwait(false); - await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient + { + ClientId = clientId, + ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion + })); + + await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); } catch (Exception exception) { @@ -71,8 +79,20 @@ namespace MQTTnet.Core.Server } finally { - await eventArgs.ClientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, eventArgs.ClientAdapter)); + try + { + await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); + } + catch (Exception) + { + //ignored + } + + ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient + { + ClientId = clientId, + ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion + })); } } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index fd5a4dc..5b5f70a 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -23,6 +23,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager = new MqttClientSessionsManager(options); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); + _clientSessionsManager.ClientConnected += OnClientConnected; _clientSessionsManager.ClientDisconnected += OnClientDisconnected; } @@ -42,12 +43,12 @@ namespace MQTTnet.Core.Server _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); } - public void InjectClient(string identifier, IMqttCommunicationAdapter adapter) + public void InjectClient(IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started."); - OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter)); + OnClientAccepted(adapter); } public async Task StartAsync() @@ -60,7 +61,7 @@ namespace MQTTnet.Core.Server foreach (var adapter in _adapters) { - adapter.ClientConnected += OnClientConnected; + adapter.ClientAccepted += OnClientAccepted; await adapter.StartAsync(_options); } @@ -75,7 +76,7 @@ namespace MQTTnet.Core.Server foreach (var adapter in _adapters) { - adapter.ClientConnected -= OnClientConnected; + adapter.ClientAccepted -= OnClientAccepted; await adapter.StopAsync(); } @@ -84,17 +85,20 @@ namespace MQTTnet.Core.Server MqttTrace.Information(nameof(MqttServer), "Stopped."); } + private void OnClientAccepted(IMqttCommunicationAdapter adapter) + { + Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(adapter), _cancellationTokenSource.Token); + } + private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier); + MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); ClientConnected?.Invoke(this, eventArgs); - - Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token); } private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { - MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Identifier); + MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); ClientDisconnected?.Invoke(this, eventArgs); } }