diff --git a/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs b/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs index 9e12028..98ebce5 100644 --- a/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs +++ b/MQTTnet.Core/Client/MqttApplicationMessageReceivedEventArgs.cs @@ -4,11 +4,14 @@ namespace MQTTnet.Core.Client { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { - public MqttApplicationMessageReceivedEventArgs(MqttApplicationMessage applicationMessage) + public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) { + ClientId = clientId; ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); } + public string ClientId { get; } + public MqttApplicationMessage ApplicationMessage { get; } } } diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index e33b258..b54288c 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -305,7 +305,7 @@ namespace MQTTnet.Core.Client try { var applicationMessage = publishPacket.ToApplicationMessage(); - ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); + ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage)); } catch (Exception exception) { diff --git a/MQTTnet.Core/Client/MqttClientExtensions.cs b/MQTTnet.Core/Client/MqttClientExtensions.cs index f664dbd..ff61548 100644 --- a/MQTTnet.Core/Client/MqttClientExtensions.cs +++ b/MQTTnet.Core/Client/MqttClientExtensions.cs @@ -2,13 +2,12 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using MQTTnet.Core.Packets; namespace MQTTnet.Core.Client { public static class MqttClientExtensions { - public static Task PublishAsync(this IMqttClient client, params MqttApplicationMessage[] applicationMessages) + public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages) { if (client == null) throw new ArgumentNullException(nameof(client)); if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index db3e1b0..58c4b74 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -1,18 +1,16 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using MQTTnet.Core.Client; namespace MQTTnet.Core.Server { - public interface IMqttServer + public interface IMqttServer : IApplicationMessageReceiver, IApplicationMessagePublisher { event EventHandler ClientConnected; event EventHandler ClientDisconnected; - event EventHandler ApplicationMessageReceived; - IList GetConnectedClients(); - void Publish(MqttApplicationMessage applicationMessage); Task StartAsync(); Task StopAsync(); diff --git a/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs b/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs deleted file mode 100644 index 4f2cc80..0000000 --- a/MQTTnet.Core/Server/MqttApplicationMessageReceivedEventArgs.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; - -namespace MQTTnet.Core.Server -{ - public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs - { - public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) - { - ClientId = clientId; - ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); - } - - public string ClientId { get; } - - public MqttApplicationMessage ApplicationMessage { get; } - } -} diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index b621b83..09e0781 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -11,6 +11,7 @@ using MQTTnet.Core.Protocol; using MQTTnet.Core.Serializer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using MQTTnet.Core.Client; namespace MQTTnet.Core.Server { @@ -28,9 +29,9 @@ namespace MQTTnet.Core.Server _mqttClientSesssionFactory = mqttClientSesssionFactory ?? throw new ArgumentNullException(nameof(mqttClientSesssionFactory)); } - public event EventHandler ApplicationMessageReceived; public event EventHandler ClientConnected; public event EventHandler ClientDisconnected; + public event EventHandler ApplicationMessageReceived; public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } public MqttServerOptions Options { get; } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index c651e82..16e2e2e 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -6,6 +6,7 @@ using MQTTnet.Core.Adapter; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Linq; +using MQTTnet.Core.Client; namespace MQTTnet.Core.Server { @@ -45,12 +46,22 @@ namespace MQTTnet.Core.Server public event EventHandler ClientDisconnected; public event EventHandler ApplicationMessageReceived; - public void Publish(MqttApplicationMessage applicationMessage) + public Task PublishAsync(IEnumerable applicationMessages) { - if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); - _options.ApplicationMessageInterceptor?.Invoke(applicationMessage); - _clientSessionsManager.DispatchApplicationMessage(null, applicationMessage); + if (_cancellationTokenSource == null) + { + throw new InvalidOperationException("The server is not started."); + } + + foreach (var applicationMessage in applicationMessages) + { + _options.ApplicationMessageInterceptor?.Invoke(applicationMessage); + _clientSessionsManager.DispatchApplicationMessage(null, applicationMessage); + } + + return Task.FromResult(0); } public async Task StartAsync() diff --git a/MQTTnet.Core/Server/MqttServerOptions.cs b/MQTTnet.Core/Server/MqttServerOptions.cs index 4c6f7ad..25f7cc8 100644 --- a/MQTTnet.Core/Server/MqttServerOptions.cs +++ b/MQTTnet.Core/Server/MqttServerOptions.cs @@ -4,7 +4,7 @@ using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Server { - public sealed class MqttServerOptions + public class MqttServerOptions { public MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; } = new MqttServerDefaultEndpointOptions(); diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 67a6112..d548ff0 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -122,7 +122,7 @@ namespace MQTTnet.Core.Tests var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); - s.Publish(message); + s.PublishAsync(message).Wait(); await Task.Delay(500); await s.StopAsync(); @@ -180,7 +180,13 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_ClearRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttFactory().CreateMqttServer(); + var services = new ServiceCollection() + .AddLogging() + .AddMqttServer() // TODO: Is there maybe an easier way for the library user to set the options? + .AddSingleton(serverAdapter) + .BuildServiceProvider(); + + var s = new MqttFactory(services).CreateMqttServer(); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -207,11 +213,12 @@ namespace MQTTnet.Core.Tests var serverAdapter = new TestMqttServerAdapter(); var services = new ServiceCollection() - .AddMqttServer(options => options.Storage = storage) + .AddLogging() + .AddMqttServer(options => options.Storage = storage) // TODO: Is there maybe an easier way for the library user to set the options? .AddSingleton(serverAdapter) .BuildServiceProvider(); - var s = new MqttFactory().CreateMqttServer(); + var s = new MqttFactory(services).CreateMqttServer(); // TODO: Like here? await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -246,6 +253,7 @@ namespace MQTTnet.Core.Tests var serverAdapter = new TestMqttServerAdapter(); var services = new ServiceCollection() + .AddLogging() .AddMqttServer(options => options.ApplicationMessageInterceptor = Interceptor) .AddSingleton(serverAdapter) .BuildServiceProvider(); diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index a71978c..c5da4e5 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -8,6 +8,7 @@ using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Logging; using MQTTnet.AspnetCore; using MQTTnet.Core; +using MQTTnet.Core.Client; namespace MQTTnet.TestApp.AspNetCore2 { @@ -29,7 +30,7 @@ namespace MQTTnet.TestApp.AspNetCore2 while (true) { - server.Publish(msg.Build()); + server.PublishAsync(msg.Build()).Wait(); await Task.Delay(TimeSpan.FromSeconds(2)); msg.WithPayload("Mqtt is still awesome at " + DateTime.Now); }