diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index c0ab6a6..159137c 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public Task StartAsync(MqttServerOptions options) { @@ -93,7 +93,7 @@ namespace MQTTnet.Implementations var tcpChannel = new MqttTcpChannel(clientSocket, null); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) when (!(exception is ObjectDisposedException)) { @@ -118,7 +118,7 @@ namespace MQTTnet.Implementations var tcpChannel = new MqttTcpChannel(clientSocket, sslStream); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 325f622..6a90109 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -22,7 +22,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public Task StartAsync(MqttServerOptions options) { @@ -90,7 +90,7 @@ namespace MQTTnet.Implementations { var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { @@ -114,7 +114,7 @@ namespace MQTTnet.Implementations await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index 3c1300c..7206f54 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Implementations private bool _isRunning; - public event EventHandler ClientConnected; + public event Action ClientAccepted; public async Task StartAsync(MqttServerOptions options) { @@ -57,7 +57,7 @@ namespace MQTTnet.Implementations try { var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); - ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter)); + ClientAccepted?.Invoke(clientAdapter); } catch (Exception exception) { diff --git a/MQTTnet.Core/Adapter/IMqttServerAdapter.cs b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs index 416139a..b1546ef 100644 --- a/MQTTnet.Core/Adapter/IMqttServerAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs @@ -6,7 +6,7 @@ namespace MQTTnet.Core.Adapter { public interface IMqttServerAdapter { - event EventHandler ClientConnected; + event Action ClientAccepted; Task StartAsync(MqttServerOptions options); Task StopAsync(); diff --git a/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs index fc0fd8b..ade4d9d 100644 --- a/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs @@ -1,17 +1,15 @@ -using System; +using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Adapter { public class MqttClientConnectedEventArgs : EventArgs { - public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter) + public MqttClientConnectedEventArgs(ConnectedMqttClient client) { - Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier)); - ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + Client = client ?? throw new ArgumentNullException(nameof(client)); } - public string Identifier { get; } - - public IMqttCommunicationAdapter ClientAdapter { get; } + public ConnectedMqttClient Client { get; } } } diff --git a/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs b/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs index 48d49c1..8316d40 100644 --- a/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs @@ -1,17 +1,15 @@ -using System; +using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Adapter { public class MqttClientDisconnectedEventArgs : EventArgs { - public MqttClientDisconnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter) + public MqttClientDisconnectedEventArgs(ConnectedMqttClient client) { - Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier)); - ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + Client = client ?? throw new ArgumentNullException(nameof(client)); } - - public string Identifier { get; } - - public IMqttCommunicationAdapter ClientAdapter { get; } + + public ConnectedMqttClient Client { get; } } } diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index c054fed..023feb8 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -11,8 +11,7 @@ namespace MQTTnet.Core.Server event EventHandler ClientConnected; event EventHandler ClientDisconnected; - IList GetConnectedClients(); - void InjectClient(string identifier, IMqttCommunicationAdapter adapter); + IReadOnlyList GetConnectedClients(); void Publish(MqttApplicationMessage applicationMessage); Task StartAsync(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 2115508..fe1ab38 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -19,8 +19,7 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; - - private string _identifier; + private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; @@ -38,7 +37,7 @@ namespace MQTTnet.Core.Server public IMqttCommunicationAdapter Adapter { get; private set; } - public async Task RunAsync(string identifier, MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) + public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter) { if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -46,7 +45,6 @@ namespace MQTTnet.Core.Server try { - _identifier = identifier; Adapter = adapter; _cancellationTokenSource = new CancellationTokenSource(); @@ -58,11 +56,11 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier); + MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier); + MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } } @@ -79,7 +77,7 @@ namespace MQTTnet.Core.Server Adapter = null; - MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", _identifier); + MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); } public void EnqueuePublishPacket(MqttPublishPacket publishPacket) @@ -92,7 +90,7 @@ namespace MQTTnet.Core.Server } _pendingMessagesQueue.Enqueue(publishPacket); - MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier); + MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); } public void Dispose() @@ -116,12 +114,12 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier); + MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); Stop(); } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier); + MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); Stop(); } } @@ -159,12 +157,12 @@ namespace MQTTnet.Core.Server } else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { - _cancellationTokenSource.Cancel(); + Stop(); } else { - MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet); - _cancellationTokenSource.Cancel(); + MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + Stop(); } } @@ -181,7 +179,7 @@ namespace MQTTnet.Core.Server { if (publishPacket.Retain) { - await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(_identifier, publishPacket); + await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 71502ac..981cdd3 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -25,16 +25,18 @@ namespace MQTTnet.Core.Server public event EventHandler ApplicationMessageReceived; + public event EventHandler ClientConnected; + public event EventHandler ClientDisconnected; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } - public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs) + public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter) { var clientId = string.Empty; try { - if (!(await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) + if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket)) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); } @@ -42,12 +44,12 @@ namespace MQTTnet.Core.Server clientId = connectPacket.ClientId; // Switch to the required protocol version before sending any response. - eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; + clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode }).ConfigureAwait(false); @@ -57,13 +59,19 @@ namespace MQTTnet.Core.Server var clientSession = GetOrCreateClientSession(connectPacket); - await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket + await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession }).ConfigureAwait(false); - await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient + { + ClientId = clientId, + ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion + })); + + await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false); } catch (Exception exception) { @@ -71,8 +79,20 @@ namespace MQTTnet.Core.Server } finally { - await eventArgs.ClientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, eventArgs.ClientAdapter)); + try + { + await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); + } + catch (Exception) + { + //ignored + } + + ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient + { + ClientId = clientId, + ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion + })); } } @@ -84,7 +104,7 @@ namespace MQTTnet.Core.Server } } - public IList GetConnectedClients() + public IReadOnlyList GetConnectedClients() { lock (_clientSessions) { diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index fd5a4dc..a2a848f 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -23,10 +23,11 @@ namespace MQTTnet.Core.Server _clientSessionsManager = new MqttClientSessionsManager(options); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); + _clientSessionsManager.ClientConnected += OnClientConnected; _clientSessionsManager.ClientDisconnected += OnClientDisconnected; } - public IList GetConnectedClients() + public IReadOnlyList GetConnectedClients() { return _clientSessionsManager.GetConnectedClients(); } @@ -42,14 +43,6 @@ namespace MQTTnet.Core.Server _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); } - public void InjectClient(string identifier, IMqttCommunicationAdapter adapter) - { - if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started."); - - OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter)); - } - public async Task StartAsync() { if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started."); @@ -60,7 +53,7 @@ namespace MQTTnet.Core.Server foreach (var adapter in _adapters) { - adapter.ClientConnected += OnClientConnected; + adapter.ClientAccepted += OnClientAccepted; await adapter.StartAsync(_options); } @@ -75,7 +68,7 @@ namespace MQTTnet.Core.Server foreach (var adapter in _adapters) { - adapter.ClientConnected -= OnClientConnected; + adapter.ClientAccepted -= OnClientAccepted; await adapter.StopAsync(); } @@ -84,17 +77,20 @@ namespace MQTTnet.Core.Server MqttTrace.Information(nameof(MqttServer), "Stopped."); } + private void OnClientAccepted(IMqttCommunicationAdapter adapter) + { + Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(adapter), _cancellationTokenSource.Token); + } + private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier); + MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); ClientConnected?.Invoke(this, eventArgs); - - Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token); } private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { - MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Identifier); + MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); ClientDisconnected?.Invoke(this, eventArgs); } } diff --git a/MQTTnet.sln b/MQTTnet.sln index 2789163..419fa6a 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -50,18 +50,18 @@ Global {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|x86 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|x86 + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|x86 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|x86 + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.Build.0 = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.ActiveCfg = Debug|Any CPU diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 2e20398..2a03359 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -1,115 +1,20 @@ - - - + + - Debug - AnyCPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} - Library - Properties - MQTTnet.Core.Tests - MQTTnet.Core.Tests - v4.5.2 - 512 - + Exe + Full + netcoreapp2.0 - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - - - bin\Any CPU\ - TRACE - true - pdbonly - AnyCPU - prompt - MinimumRecommendedRules.ruleset - - - true - bin\x64\Debug\ - DEBUG;TRACE - full - x64 - prompt - MinimumRecommendedRules.ruleset - - - bin\x64\Release\ - TRACE - true - pdbonly - x64 - prompt - MinimumRecommendedRules.ruleset - - - bin\x64\Any CPU\ - TRACE - true - pdbonly - x64 - prompt - MinimumRecommendedRules.ruleset - - - x86 - bin\x86\Debug\ - - - x86 - bin\x86\Release\ - - - x86 - bin\x86\Any CPU\ - - - - - - - - - - - - - - - - - - + - + + + + - - {2ecb99e4-72d0-4c23-99ba-93d511d3967d} - MQTTnet.Core - + + - - + \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 83978f3..19f96c6 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -6,6 +6,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Tests { @@ -48,12 +49,13 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_WillMessage() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", willMessage, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -63,7 +65,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(1000); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(1, receivedMessagesCount); } @@ -71,11 +73,12 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_Unsubscribe() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -97,7 +100,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); - s.StopAsync(); + await s.StopAsync(); await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); @@ -106,10 +109,11 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_Publish() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -120,23 +124,10 @@ namespace MQTTnet.Core.Tests s.Publish(message); await Task.Delay(500); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(1, receivedMessagesCount); - } - - private static MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server) - { - var adapterA = new TestMqttCommunicationAdapter(); - var adapterB = new TestMqttCommunicationAdapter(); - adapterA.Partner = adapterB; - adapterB.Partner = adapterA; - - var client = new MqttClient(new MqttClientOptions(), adapterA); - server.InjectClient(clientId, adapterB); - client.ConnectAsync(willMessage).Wait(); - return client; - } + } private async Task TestPublishAsync( string topic, @@ -145,11 +136,12 @@ namespace MQTTnet.Core.Tests MqttQualityOfServiceLevel filterQualityOfServiceLevel, int expectedReceivedMessagesCount) { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -162,7 +154,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(500); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); } diff --git a/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs b/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index 9cd27a9..0000000 --- a/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Reflection; -using System.Runtime.InteropServices; - -[assembly: AssemblyTitle("MQTTnet.Core.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Christian Kratky")] -[assembly: AssemblyProduct("MQTTnet")] -[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] -[assembly: ComVisible(false)] -[assembly: Guid("a7ff0c91-25de-4ba6-b39e-f54e8dadf1cc")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index 6c4c4a8..ad099d2 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -2,16 +2,52 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; +using MQTTnet.Core.Client; namespace MQTTnet.Core.Tests { public class TestMqttServerAdapter : IMqttServerAdapter { - public event EventHandler ClientConnected; + public event Action ClientAccepted; - public void FireClientConnectedEvent(MqttClientConnectedEventArgs eventArgs) + public async Task ConnectTestClient(IMqttServer server, string clientId, MqttApplicationMessage willMessage = null) { - ClientConnected?.Invoke(this, eventArgs); + var adapterA = new TestMqttCommunicationAdapter(); + var adapterB = new TestMqttCommunicationAdapter(); + adapterA.Partner = adapterB; + adapterB.Partner = adapterA; + + var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA); + var connected = WaitForClientToConnect(server, clientId); + FireClientAcceptedEvent(adapterB); + await client.ConnectAsync(willMessage); + await connected; + + return client; + } + + private static Task WaitForClientToConnect(IMqttServer s, string clientId) + { + var tcs = new TaskCompletionSource(); + + EventHandler handler = null; + handler = (sender, args) => + { + if (args.Client.ClientId == clientId) + { + s.ClientConnected -= handler; + tcs.SetResult(null); + } + }; + + s.ClientConnected += handler; + + return tcs.Task; + } + + private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter) + { + ClientAccepted?.Invoke(adapter); } public Task StartAsync(MqttServerOptions options)