diff --git a/.gitignore b/.gitignore index b665d09..39550dc 100644 --- a/.gitignore +++ b/.gitignore @@ -289,3 +289,5 @@ __pycache__/ Build/nuget.exe +*.js +*.map diff --git a/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs new file mode 100644 index 0000000..87317a7 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs @@ -0,0 +1,42 @@ +using System; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using MQTTnet.AspNetCore; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public static class ApplicationBuilderExtensions + { + public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app, string path = "/mqtt") + { + app.UseWebSockets(); + app.Use(async (context, next) => + { + if (context.Request.Path == path && context.WebSockets.IsWebSocketRequest) + { + var adapter = app.ApplicationServices.GetRequiredService(); + using (var webSocket = await context.WebSockets.AcceptWebSocketAsync("mqtt")) + { + await adapter.AcceptWebSocketAsync(webSocket); + } + } + else + { + await next(); + } + }); + + return app; + } + + public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action configure) + { + var server = app.ApplicationServices.GetRequiredService(); + + configure(server); + + return app; + } + } +} diff --git a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj new file mode 100644 index 0000000..5142cc1 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.0 + + + + + + + + + + + + + diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs new file mode 100644 index 0000000..b5326ab --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -0,0 +1,29 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public class MqttHostedServer : MqttServer, IHostedService + { + public MqttHostedServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager) + : base(options, adapters, logger, clientSessionsManager) + { + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return StartAsync(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return StopAsync(); + } + } +} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs similarity index 86% rename from Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs rename to Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 90ad49f..5cd5d55 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -5,21 +5,17 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Channel; -using MQTTnet.Core.Serializer; using MQTTnet.Core.Server; using MQTTnet.Implementations; -using Microsoft.Extensions.Logging; -namespace MQTTnet.TestApp.AspNetCore2 +namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable { - private readonly ILogger _logger; private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; - public MqttWebSocketServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) + public MqttWebSocketServerAdapter(IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); } @@ -32,7 +28,7 @@ namespace MQTTnet.TestApp.AspNetCore2 public Task StopAsync() { - return Task.FromResult(0); + return Task.CompletedTask; } public Task AcceptWebSocketAsync(WebSocket webSocket) diff --git a/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..b0297b2 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using MQTTnet.AspNetCore; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) + { + services.AddMqttServerServices(); + + services.AddSingleton(s => s.GetService()); + services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); + + return services; + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index d38de6f..c27e480 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -3,6 +3,7 @@ using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Core.Exceptions; namespace MQTTnet.Implementations { @@ -41,6 +42,7 @@ namespace MQTTnet.Implementations { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; + count -= response.Count; if (response.MessageType == WebSocketMessageType.Close) { @@ -48,6 +50,11 @@ namespace MQTTnet.Implementations } } + if (_webSocket.State == WebSocketState.Closed) + { + throw new MqttCommunicationException( "connection closed" ); + } + return currentOffset - offset; } diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs index f8f647a..019f39b 100644 --- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs +++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs @@ -15,14 +15,20 @@ namespace MQTTnet { public static IServiceCollection AddMqttServer(this IServiceCollection services) { - services.AddOptions(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); + services.AddMqttServerServices(); + + services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + return services; + } - services.AddSingleton(); - services.AddSingleton(); + public static IServiceCollection AddMqttServerServices(this IServiceCollection services) + { + services.AddOptions(); + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); + services.AddSingleton(s => s.GetService()); services.AddTransient(); services.AddTransient(); @@ -42,7 +48,7 @@ namespace MQTTnet public static IServiceCollection AddMqttClient(this IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(s => s.GetService()); services.AddTransient(); services.AddTransient(); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs index 1843c6a..718b5ee 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs @@ -3,6 +3,7 @@ using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Core.Exceptions; namespace MQTTnet.Implementations { @@ -27,13 +28,19 @@ namespace MQTTnet.Implementations { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; - + count -= response.Count; + if (response.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } } + if (_webSocket.State == WebSocketState.Closed) + { + throw new MqttCommunicationException("connection closed"); + } + return currentOffset - offset; } diff --git a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs index 298de9a..c812ea9 100644 --- a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs +++ b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs @@ -10,5 +10,7 @@ namespace MQTTnet.Core.ManagedClient TimeSpan AutoReconnectDelay { get; } IManagedMqttClientStorage Storage { get; } + + Func PasswordProvider { get; } } } \ No newline at end of file diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 6b2e1dd..0d37fe7 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -277,6 +277,7 @@ namespace MQTTnet.Core.ManagedClient try { + _options.PasswordProvider?.Invoke(_options); await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); return ReconnectionResult.Reconnected; } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs index 2e24fd0..45b2af3 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs @@ -10,5 +10,8 @@ namespace MQTTnet.Core.ManagedClient public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); public IManagedMqttClientStorage Storage { get; set; } + + public Func PasswordProvider { get; set; } + } } diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index b5c25de..0d208c0 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -9,7 +9,7 @@ using System.Linq; namespace MQTTnet.Core.Server { - public sealed class MqttServer : IMqttServer + public class MqttServer : IMqttServer { private readonly ILogger _logger; private readonly MqttClientSessionsManager _clientSessionsManager; diff --git a/MQTTnet.sln b/MQTTnet.sln index 56a56aa5..263fee0 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspnetCore", "Frameworks\MQTTnet.AspnetCore\MQTTnet.AspnetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -163,6 +165,22 @@ Global {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -174,6 +192,7 @@ Global {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj index ce35f43..2f04208 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -2,10 +2,11 @@ netcoreapp2.0 + 2.3 - + @@ -13,8 +14,11 @@ - - + + + + + diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index a4f5298..a71978c 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -1,11 +1,13 @@ -using System.Collections.Generic; -using System.Diagnostics; +using System; +using System.IO; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Logging; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Server; +using MQTTnet.AspnetCore; +using MQTTnet.Core; namespace MQTTnet.TestApp.AspNetCore2 { @@ -13,35 +15,44 @@ namespace MQTTnet.TestApp.AspNetCore2 { public void ConfigureServices(IServiceCollection services) { - services.AddMqttServer(); - services.AddSingleton(); - services.AddSingleton(); + services.AddHostedMqttServer(); } - public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { - loggerFactory.AddConsole(LogLevel.Debug); - loggerFactory.AddDebug(); - - var adapter = app.ApplicationServices.GetService(); - var mqttServer = app.ApplicationServices.GetService(); - await mqttServer.StartAsync(); - - app.UseWebSockets(); - app.Use(async (context, next) => + app.UseMqttEndpoint(); + app.UseMqttServer(async server => { - if (context.WebSockets.IsWebSocketRequest) + var msg = new MqttApplicationMessageBuilder() + .WithPayload("Mqtt is awesome") + .WithTopic("message"); + + while (true) { - using (var webSocket = await context.WebSockets.AcceptWebSocketAsync()) - { - await adapter.AcceptWebSocketAsync(webSocket); - } + server.Publish(msg.Build()); + await Task.Delay(TimeSpan.FromSeconds(2)); + msg.WithPayload("Mqtt is still awesome at " + DateTime.Now); } - else + }); + + app.Use((context, next) => + { + if (context.Request.Path == "/") { - await next(); + context.Request.Path = "/Index.html"; } + + return next(); }); + + app.UseStaticFiles(); + + + app.UseStaticFiles( new StaticFileOptions() + { + RequestPath = "/node_modules", + FileProvider = new PhysicalFileProvider( Path.Combine(env.ContentRootPath, "node_modules" ) ) + } ); } } } diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/package.json b/Tests/MQTTnet.TestApp.AspNetCore2/package.json new file mode 100644 index 0000000..ad48ca1 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/package.json @@ -0,0 +1,14 @@ +{ + "version": "1.0.0", + "name": "mqtt.test", + "private": true, + "devDependencies": { + }, + + "dependencies": { + "mqtt": "2.13.1", + "@types/node": "8.0.46", + "systemjs": "0.20.19", + "typescript": "2.3.4" + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json b/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json new file mode 100644 index 0000000..5794ac5 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "noImplicitAny": false, + "noEmitOnError": true, + "removeComments": false, + "sourceMap": true, + "target": "es5" + }, + "exclude": [ + "node_modules" + ] +} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html new file mode 100644 index 0000000..9cc67e4 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html @@ -0,0 +1,35 @@ + + + + + MQTT test + + + + + +
+

+ + + +
+ +
+
    +
    + + + \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts new file mode 100644 index 0000000..3ce9856 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts @@ -0,0 +1,56 @@ +import { connect } from "mqtt"; + +var client = connect('ws://' + location.host + '/mqtt', + { + clientId: "client1", + }); + +window.onbeforeunload = () => { + client.end(); +}; + +var publishButton = document.getElementById("publish"); +var topicInput = document.getElementById("topic"); +var msgInput = document.getElementById("msg"); +var stateParagraph = document.getElementById("state"); +var msgsList = document.getElementById("msgs"); + +publishButton.onclick = click => { + var topic = topicInput.value; + var msg = msgInput.value; + client.publish(topic, msg); +}; + +client.on('connect', () => { + client.subscribe('#', { qos: 0 }, (err, granted) => { + console.log(err); + }); + client.publish('presence', 'Hello mqtt'); + + stateParagraph.innerText = "connected"; +}); + +client.on("error", e => { + showMsg("error: " + e.message); +}); + +client.on("reconnect", () => { + stateParagraph.innerText = "reconnecting"; +}); + +client.on('message', (topic, message) => { + showMsg(topic + ": " + message.toString()); +}); + +function showMsg(msg: string) { + //console.log(msg); + + var node = document.createElement("LI"); + node.appendChild(document.createTextNode(msg)); + + msgsList.appendChild(node); + + if (msgsList.childElementCount > 20) { + msgsList.removeChild(msgsList.childNodes[0]); + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index b54de06..05dd0cf 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -7,6 +7,9 @@ using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using System.IO; +using Newtonsoft.Json; +using System.Collections.Generic; namespace MQTTnet.TestApp.NetCore { @@ -22,18 +25,32 @@ namespace MQTTnet.TestApp.NetCore services.GetService() .AddConsole(); + ClientRetainedMessageHandler ms = new ClientRetainedMessageHandler(); + Func func = delegate (ManagedMqttClientOptions managedMqttClientOptions) + { + return "password"; + }; var options = new ManagedMqttClientOptions { ClientOptions = new MqttClientTcpOptions { Server = "broker.hivemq.com", - ClientId = "MQTTnetManagedClientTest" + ClientId = "MQTTnetManagedClientTest", + Password = "pippo", }, - AutoReconnectDelay = TimeSpan.FromSeconds(1) + AutoReconnectDelay = TimeSpan.FromSeconds(1), + Storage = ms, + PasswordProvider = o => + { + //o.ClientOptions.Password = GetPassword(); + return o.ClientOptions.Password; + } }; + + try { var managedClient = services.GetRequiredService(); @@ -59,5 +76,39 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine(e); } } + + + public static string GetPassword() + { + return "password"; + } + + + public class ClientRetainedMessageHandler : IManagedMqttClientStorage + { + private const string Filename = @"RetainedMessages.json"; + + public Task SaveQueuedMessagesAsync(IList messages) + { + File.WriteAllText(Filename, JsonConvert.SerializeObject(messages)); + return Task.FromResult(0); + } + + public Task> LoadQueuedMessagesAsync() + { + IList retainedMessages; + if (File.Exists(Filename)) + { + var json = File.ReadAllText(Filename); + retainedMessages = JsonConvert.DeserializeObject>(json); + } + else + { + retainedMessages = new List(); + } + + return Task.FromResult(retainedMessages); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/messages.bin b/Tests/MQTTnet.TestApp.NetCore/messages.bin deleted file mode 100644 index 59505fa..0000000 Binary files a/Tests/MQTTnet.TestApp.NetCore/messages.bin and /dev/null differ