From 1ca9b0bd62896c1ea79c4496d6cdb67d6f5e67c7 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 12:45:14 +0200 Subject: [PATCH 1/6] added aspnetcore package and adapted sample app --- .gitignore | 2 + .../ApplicationBuilderExtensions.cs | 42 ++++++++++++++ .../MQTTnet.AspnetCore.csproj | 17 ++++++ .../MQTTnet.AspnetCore/MqttHostedServer.cs | 29 ++++++++++ .../MqttWebSocketServerAdapter.cs | 10 +--- .../ServiceCollectionExtensions.cs | 25 +++++++++ .../ServiceCollectionExtensions.cs | 20 ++++--- MQTTnet.Core/Server/MqttServer.cs | 2 +- MQTTnet.sln | 19 +++++++ .../MQTTnet.TestApp.AspNetCore2.csproj | 10 +++- Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs | 56 ++++++++++++------- .../MQTTnet.TestApp.AspNetCore2/package.json | 14 +++++ .../MQTTnet.TestApp.AspNetCore2/tsconfig.json | 12 ++++ .../wwwroot/Index.html | 23 ++++++++ .../wwwroot/app/app.ts | 16 ++++++ 15 files changed, 258 insertions(+), 39 deletions(-) create mode 100644 Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs create mode 100644 Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj create mode 100644 Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs rename {Tests/MQTTnet.TestApp.AspNetCore2 => Frameworks/MQTTnet.AspnetCore}/MqttWebSocketServerAdapter.cs (86%) create mode 100644 Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs create mode 100644 Tests/MQTTnet.TestApp.AspNetCore2/package.json create mode 100644 Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json create mode 100644 Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html create mode 100644 Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts diff --git a/.gitignore b/.gitignore index b665d09..39550dc 100644 --- a/.gitignore +++ b/.gitignore @@ -289,3 +289,5 @@ __pycache__/ Build/nuget.exe +*.js +*.map diff --git a/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs new file mode 100644 index 0000000..87317a7 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs @@ -0,0 +1,42 @@ +using System; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using MQTTnet.AspNetCore; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public static class ApplicationBuilderExtensions + { + public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app, string path = "/mqtt") + { + app.UseWebSockets(); + app.Use(async (context, next) => + { + if (context.Request.Path == path && context.WebSockets.IsWebSocketRequest) + { + var adapter = app.ApplicationServices.GetRequiredService(); + using (var webSocket = await context.WebSockets.AcceptWebSocketAsync("mqtt")) + { + await adapter.AcceptWebSocketAsync(webSocket); + } + } + else + { + await next(); + } + }); + + return app; + } + + public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action configure) + { + var server = app.ApplicationServices.GetRequiredService(); + + configure(server); + + return app; + } + } +} diff --git a/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj new file mode 100644 index 0000000..5142cc1 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.0 + + + + + + + + + + + + + diff --git a/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs new file mode 100644 index 0000000..b5326ab --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -0,0 +1,29 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public class MqttHostedServer : MqttServer, IHostedService + { + public MqttHostedServer(IOptions options, IEnumerable adapters, ILogger logger, MqttClientSessionsManager clientSessionsManager) + : base(options, adapters, logger, clientSessionsManager) + { + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return StartAsync(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return StopAsync(); + } + } +} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs similarity index 86% rename from Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs rename to Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 90ad49f..5cd5d55 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -5,21 +5,17 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Channel; -using MQTTnet.Core.Serializer; using MQTTnet.Core.Server; using MQTTnet.Implementations; -using Microsoft.Extensions.Logging; -namespace MQTTnet.TestApp.AspNetCore2 +namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable { - private readonly ILogger _logger; private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; - public MqttWebSocketServerAdapter(ILogger logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) + public MqttWebSocketServerAdapter(IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); } @@ -32,7 +28,7 @@ namespace MQTTnet.TestApp.AspNetCore2 public Task StopAsync() { - return Task.FromResult(0); + return Task.CompletedTask; } public Task AcceptWebSocketAsync(WebSocket webSocket) diff --git a/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..b0297b2 --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using MQTTnet.AspNetCore; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Server; + +namespace MQTTnet.AspnetCore +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) + { + services.AddMqttServerServices(); + + services.AddSingleton(s => s.GetService()); + services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); + + return services; + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs index f8f647a..019f39b 100644 --- a/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs +++ b/Frameworks/MQTTnet.NetStandard/ServiceCollectionExtensions.cs @@ -15,14 +15,20 @@ namespace MQTTnet { public static IServiceCollection AddMqttServer(this IServiceCollection services) { - services.AddOptions(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); + services.AddMqttServerServices(); + + services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + return services; + } - services.AddSingleton(); - services.AddSingleton(); + public static IServiceCollection AddMqttServerServices(this IServiceCollection services) + { + services.AddOptions(); + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); + services.AddSingleton(s => s.GetService()); services.AddTransient(); services.AddTransient(); @@ -42,7 +48,7 @@ namespace MQTTnet public static IServiceCollection AddMqttClient(this IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(s => s.GetService()); services.AddTransient(); services.AddTransient(); diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index b5c25de..0d208c0 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -9,7 +9,7 @@ using System.Linq; namespace MQTTnet.Core.Server { - public sealed class MqttServer : IMqttServer + public class MqttServer : IMqttServer { private readonly ILogger _logger; private readonly MqttClientSessionsManager _clientSessionsManager; diff --git a/MQTTnet.sln b/MQTTnet.sln index 56a56aa5..263fee0 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspnetCore", "Frameworks\MQTTnet.AspnetCore\MQTTnet.AspnetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -163,6 +165,22 @@ Global {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU + {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -174,6 +192,7 @@ Global {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj index ce35f43..2f04208 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -2,10 +2,11 @@ netcoreapp2.0 + 2.3 - + @@ -13,8 +14,11 @@ - - + + + + + diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index a4f5298..fc52a0b 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -1,11 +1,13 @@ -using System.Collections.Generic; -using System.Diagnostics; +using System; +using System.IO; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Logging; -using MQTTnet.Core.Adapter; -using MQTTnet.Core.Server; +using MQTTnet.AspnetCore; +using MQTTnet.Core; namespace MQTTnet.TestApp.AspNetCore2 { @@ -13,35 +15,47 @@ namespace MQTTnet.TestApp.AspNetCore2 { public void ConfigureServices(IServiceCollection services) { - services.AddMqttServer(); - services.AddSingleton(); - services.AddSingleton(); + services.AddHostedMqttServer(); } - public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { loggerFactory.AddConsole(LogLevel.Debug); loggerFactory.AddDebug(); - - var adapter = app.ApplicationServices.GetService(); - var mqttServer = app.ApplicationServices.GetService(); - await mqttServer.StartAsync(); - app.UseWebSockets(); - app.Use(async (context, next) => + app.UseMqttEndpoint(); + app.UseMqttServer(async server => { - if (context.WebSockets.IsWebSocketRequest) + var msg = new MqttApplicationMessageBuilder() + .WithPayload("Mqtt is Awesome") + .WithTopic("message") + .Build(); + + while (true) { - using (var webSocket = await context.WebSockets.AcceptWebSocketAsync()) - { - await adapter.AcceptWebSocketAsync(webSocket); - } + server.Publish(msg); + await Task.Delay(TimeSpan.FromSeconds(2)); } - else + }); + + app.Use((context, next) => + { + if (context.Request.Path == "/") { - await next(); + context.Request.Path = "/Index.html"; } + + return next(); }); + + app.UseStaticFiles(); + + + app.UseStaticFiles( new StaticFileOptions() + { + RequestPath = "/node_modules", + FileProvider = new PhysicalFileProvider( Path.Combine(env.ContentRootPath, "node_modules" ) ) + } ); } } } diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/package.json b/Tests/MQTTnet.TestApp.AspNetCore2/package.json new file mode 100644 index 0000000..ad48ca1 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/package.json @@ -0,0 +1,14 @@ +{ + "version": "1.0.0", + "name": "mqtt.test", + "private": true, + "devDependencies": { + }, + + "dependencies": { + "mqtt": "2.13.1", + "@types/node": "8.0.46", + "systemjs": "0.20.19", + "typescript": "2.3.4" + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json b/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json new file mode 100644 index 0000000..5794ac5 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "noImplicitAny": false, + "noEmitOnError": true, + "removeComments": false, + "sourceMap": true, + "target": "es5" + }, + "exclude": [ + "node_modules" + ] +} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html new file mode 100644 index 0000000..522f002 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html @@ -0,0 +1,23 @@ + + + + + MQTT test + + + + + + \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts new file mode 100644 index 0000000..0a801c4 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts @@ -0,0 +1,16 @@ +import { connect } from "mqtt"; + +var client = connect('ws://' + location.host + '/mqtt', + { + clientId: "client1", + }); + +client.on('connect', () => { + client.subscribe('presence'); + client.publish('presence', 'Hello mqtt'); +}); + +client.on('message', (topic, message) => { + // message is Buffer + console.log(message.toString()); +}) \ No newline at end of file From d594b1447fec66b970d84cb5efcc9fa42c5e96e7 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 12:45:26 +0200 Subject: [PATCH 2/6] fixed index out of range --- .../MQTTnet.NetStandard/Implementations/WebSocketStream.cs | 1 + .../Implementations/WebSocketStream.cs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index d38de6f..f037b38 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -41,6 +41,7 @@ namespace MQTTnet.Implementations { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; + count -= response.Count; if (response.MessageType == WebSocketMessageType.Close) { diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs index 1843c6a..2a269c0 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs @@ -27,7 +27,8 @@ namespace MQTTnet.Implementations { var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); currentOffset += response.Count; - + count -= response.Count; + if (response.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); From a68343a306c2a561ab7e9ed2b23790da774f6310 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 13:14:18 +0200 Subject: [PATCH 3/6] fixed duplicated console log --- Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index fc52a0b..56df889 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -20,9 +20,6 @@ namespace MQTTnet.TestApp.AspNetCore2 public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { - loggerFactory.AddConsole(LogLevel.Debug); - loggerFactory.AddDebug(); - app.UseMqttEndpoint(); app.UseMqttServer(async server => { From e80d21e8520879b1b8db9ed0293870f03352e43d Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 13:15:02 +0200 Subject: [PATCH 4/6] generate exception in case websocket is closed so stream does not return array of 255 --- .../MQTTnet.NetStandard/Implementations/WebSocketStream.cs | 6 ++++++ .../Implementations/WebSocketStream.cs | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index f037b38..c27e480 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -3,6 +3,7 @@ using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Core.Exceptions; namespace MQTTnet.Implementations { @@ -49,6 +50,11 @@ namespace MQTTnet.Implementations } } + if (_webSocket.State == WebSocketState.Closed) + { + throw new MqttCommunicationException( "connection closed" ); + } + return currentOffset - offset; } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs index 2a269c0..718b5ee 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs @@ -3,6 +3,7 @@ using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Core.Exceptions; namespace MQTTnet.Implementations { @@ -35,6 +36,11 @@ namespace MQTTnet.Implementations } } + if (_webSocket.State == WebSocketState.Closed) + { + throw new MqttCommunicationException("connection closed"); + } + return currentOffset - offset; } From 0e0fcabc04fc106718d6c20eb8c43dc04a312f1b Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 13:15:35 +0200 Subject: [PATCH 5/6] gently close the connection before leaving the page --- Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts index 0a801c4..166a75a 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts @@ -13,4 +13,8 @@ client.on('connect', () => { client.on('message', (topic, message) => { // message is Buffer console.log(message.toString()); -}) \ No newline at end of file +}); + +window.onbeforeunload = () => { + client.end(); +}; \ No newline at end of file From 2cd3bd7588d0fc46988fe5daaa9267bb201d0d41 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Tue, 24 Oct 2017 15:35:02 +0200 Subject: [PATCH 6/6] added some ui to send test msgs --- Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs | 8 ++-- .../wwwroot/Index.html | 12 +++++ .../wwwroot/app/app.ts | 48 ++++++++++++++++--- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index 56df889..a71978c 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -24,14 +24,14 @@ namespace MQTTnet.TestApp.AspNetCore2 app.UseMqttServer(async server => { var msg = new MqttApplicationMessageBuilder() - .WithPayload("Mqtt is Awesome") - .WithTopic("message") - .Build(); + .WithPayload("Mqtt is awesome") + .WithTopic("message"); while (true) { - server.Publish(msg); + server.Publish(msg.Build()); await Task.Delay(TimeSpan.FromSeconds(2)); + msg.WithPayload("Mqtt is still awesome at " + DateTime.Now); } }); diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html index 522f002..9cc67e4 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/Index.html @@ -19,5 +19,17 @@ }); System.import('app/app'); + +
+

+ + + +
+ +
+
    +
    + \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts index 166a75a..3ce9856 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts +++ b/Tests/MQTTnet.TestApp.AspNetCore2/wwwroot/app/app.ts @@ -5,16 +5,52 @@ var client = connect('ws://' + location.host + '/mqtt', clientId: "client1", }); +window.onbeforeunload = () => { + client.end(); +}; + +var publishButton = document.getElementById("publish"); +var topicInput = document.getElementById("topic"); +var msgInput = document.getElementById("msg"); +var stateParagraph = document.getElementById("state"); +var msgsList = document.getElementById("msgs"); + +publishButton.onclick = click => { + var topic = topicInput.value; + var msg = msgInput.value; + client.publish(topic, msg); +}; + client.on('connect', () => { - client.subscribe('presence'); + client.subscribe('#', { qos: 0 }, (err, granted) => { + console.log(err); + }); client.publish('presence', 'Hello mqtt'); + + stateParagraph.innerText = "connected"; +}); + +client.on("error", e => { + showMsg("error: " + e.message); +}); + +client.on("reconnect", () => { + stateParagraph.innerText = "reconnecting"; }); client.on('message', (topic, message) => { - // message is Buffer - console.log(message.toString()); + showMsg(topic + ": " + message.toString()); }); -window.onbeforeunload = () => { - client.end(); -}; \ No newline at end of file +function showMsg(msg: string) { + //console.log(msg); + + var node = document.createElement("LI"); + node.appendChild(document.createTextNode(msg)); + + msgsList.appendChild(node); + + if (msgsList.childElementCount > 20) { + msgsList.removeChild(msgsList.childNodes[0]); + } +} \ No newline at end of file