diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index cbe7582..023feb8 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -11,8 +11,7 @@ namespace MQTTnet.Core.Server event EventHandler ClientConnected; event EventHandler ClientDisconnected; - IList GetConnectedClients(); - void InjectClient(IMqttCommunicationAdapter adapter); + IReadOnlyList GetConnectedClients(); void Publish(MqttApplicationMessage applicationMessage); Task StartAsync(); diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 315f991..fe1ab38 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -157,12 +157,12 @@ namespace MQTTnet.Core.Server } else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) { - _cancellationTokenSource.Cancel(); + Stop(); } else { MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); - _cancellationTokenSource.Cancel(); + Stop(); } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 599d168..981cdd3 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -104,7 +104,7 @@ namespace MQTTnet.Core.Server } } - public IList GetConnectedClients() + public IReadOnlyList GetConnectedClients() { lock (_clientSessions) { diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 5b5f70a..a2a848f 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -27,7 +27,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager.ClientDisconnected += OnClientDisconnected; } - public IList GetConnectedClients() + public IReadOnlyList GetConnectedClients() { return _clientSessionsManager.GetConnectedClients(); } @@ -43,14 +43,6 @@ namespace MQTTnet.Core.Server _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); } - public void InjectClient(IMqttCommunicationAdapter adapter) - { - if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started."); - - OnClientAccepted(adapter); - } - public async Task StartAsync() { if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started."); diff --git a/MQTTnet.sln b/MQTTnet.sln index 2789163..419fa6a 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -50,18 +50,18 @@ Global {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|x86 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|x86 + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|x64 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|x86 - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|x86 + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU + {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.Build.0 = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.ActiveCfg = Debug|Any CPU diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 2e20398..2a03359 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -1,115 +1,20 @@ - - - + + - Debug - AnyCPU - {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} - Library - Properties - MQTTnet.Core.Tests - MQTTnet.Core.Tests - v4.5.2 - 512 - + Exe + Full + netcoreapp2.0 - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - - - bin\Any CPU\ - TRACE - true - pdbonly - AnyCPU - prompt - MinimumRecommendedRules.ruleset - - - true - bin\x64\Debug\ - DEBUG;TRACE - full - x64 - prompt - MinimumRecommendedRules.ruleset - - - bin\x64\Release\ - TRACE - true - pdbonly - x64 - prompt - MinimumRecommendedRules.ruleset - - - bin\x64\Any CPU\ - TRACE - true - pdbonly - x64 - prompt - MinimumRecommendedRules.ruleset - - - x86 - bin\x86\Debug\ - - - x86 - bin\x86\Release\ - - - x86 - bin\x86\Any CPU\ - - - - - - - - - - - - - - - - - - + - + + + + - - {2ecb99e4-72d0-4c23-99ba-93d511d3967d} - MQTTnet.Core - + + - - + \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 83978f3..19f96c6 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -6,6 +6,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; +using System; namespace MQTTnet.Core.Tests { @@ -48,12 +49,13 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_WillMessage() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", willMessage, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -63,7 +65,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(1000); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(1, receivedMessagesCount); } @@ -71,11 +73,12 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_Unsubscribe() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -97,7 +100,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); - s.StopAsync(); + await s.StopAsync(); await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); @@ -106,10 +109,11 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MqttServer_Publish() { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -120,23 +124,10 @@ namespace MQTTnet.Core.Tests s.Publish(message); await Task.Delay(500); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(1, receivedMessagesCount); - } - - private static MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server) - { - var adapterA = new TestMqttCommunicationAdapter(); - var adapterB = new TestMqttCommunicationAdapter(); - adapterA.Partner = adapterB; - adapterB.Partner = adapterA; - - var client = new MqttClient(new MqttClientOptions(), adapterA); - server.InjectClient(clientId, adapterB); - client.ConnectAsync(willMessage).Wait(); - return client; - } + } private async Task TestPublishAsync( string topic, @@ -145,11 +136,12 @@ namespace MQTTnet.Core.Tests MqttQualityOfServiceLevel filterQualityOfServiceLevel, int expectedReceivedMessagesCount) { - var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() }); - s.StartAsync(); + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + await s.StartAsync(); - var c1 = ConnectTestClient("c1", null, s); - var c2 = ConnectTestClient("c2", null, s); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; @@ -162,7 +154,7 @@ namespace MQTTnet.Core.Tests await Task.Delay(500); - s.StopAsync(); + await s.StopAsync(); Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); } diff --git a/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs b/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index 9cd27a9..0000000 --- a/Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Reflection; -using System.Runtime.InteropServices; - -[assembly: AssemblyTitle("MQTTnet.Core.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Christian Kratky")] -[assembly: AssemblyProduct("MQTTnet")] -[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] -[assembly: ComVisible(false)] -[assembly: Guid("a7ff0c91-25de-4ba6-b39e-f54e8dadf1cc")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index 6c4c4a8..ad099d2 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -2,16 +2,52 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; +using MQTTnet.Core.Client; namespace MQTTnet.Core.Tests { public class TestMqttServerAdapter : IMqttServerAdapter { - public event EventHandler ClientConnected; + public event Action ClientAccepted; - public void FireClientConnectedEvent(MqttClientConnectedEventArgs eventArgs) + public async Task ConnectTestClient(IMqttServer server, string clientId, MqttApplicationMessage willMessage = null) { - ClientConnected?.Invoke(this, eventArgs); + var adapterA = new TestMqttCommunicationAdapter(); + var adapterB = new TestMqttCommunicationAdapter(); + adapterA.Partner = adapterB; + adapterB.Partner = adapterA; + + var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA); + var connected = WaitForClientToConnect(server, clientId); + FireClientAcceptedEvent(adapterB); + await client.ConnectAsync(willMessage); + await connected; + + return client; + } + + private static Task WaitForClientToConnect(IMqttServer s, string clientId) + { + var tcs = new TaskCompletionSource(); + + EventHandler handler = null; + handler = (sender, args) => + { + if (args.Client.ClientId == clientId) + { + s.ClientConnected -= handler; + tcs.SetResult(null); + } + }; + + s.ClientConnected += handler; + + return tcs.Task; + } + + private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter) + { + ClientAccepted?.Invoke(adapter); } public Task StartAsync(MqttServerOptions options)