@@ -289,3 +289,5 @@ __pycache__/ | |||||
Build/nuget.exe | Build/nuget.exe | ||||
*.js | |||||
*.map |
@@ -0,0 +1,42 @@ | |||||
using System; | |||||
using Microsoft.AspNetCore.Builder; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using MQTTnet.AspNetCore; | |||||
using MQTTnet.Core.Server; | |||||
namespace MQTTnet.AspnetCore | |||||
{ | |||||
public static class ApplicationBuilderExtensions | |||||
{ | |||||
public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app, string path = "/mqtt") | |||||
{ | |||||
app.UseWebSockets(); | |||||
app.Use(async (context, next) => | |||||
{ | |||||
if (context.Request.Path == path && context.WebSockets.IsWebSocketRequest) | |||||
{ | |||||
var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>(); | |||||
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync("mqtt")) | |||||
{ | |||||
await adapter.AcceptWebSocketAsync(webSocket); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
await next(); | |||||
} | |||||
}); | |||||
return app; | |||||
} | |||||
public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action<IMqttServer> configure) | |||||
{ | |||||
var server = app.ApplicationServices.GetRequiredService<IMqttServer>(); | |||||
configure(server); | |||||
return app; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,17 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.0</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.0.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,29 @@ | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Server; | |||||
namespace MQTTnet.AspnetCore | |||||
{ | |||||
public class MqttHostedServer : MqttServer, IHostedService | |||||
{ | |||||
public MqttHostedServer(IOptions<MqttServerOptions> options, IEnumerable<IMqttServerAdapter> adapters, ILogger<MqttServer> logger, MqttClientSessionsManager clientSessionsManager) | |||||
: base(options, adapters, logger, clientSessionsManager) | |||||
{ | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return StartAsync(); | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return StopAsync(); | |||||
} | |||||
} | |||||
} |
@@ -5,21 +5,17 @@ using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Core.Adapter; | using MQTTnet.Core.Adapter; | ||||
using MQTTnet.Core.Channel; | using MQTTnet.Core.Channel; | ||||
using MQTTnet.Core.Serializer; | |||||
using MQTTnet.Core.Server; | using MQTTnet.Core.Server; | ||||
using MQTTnet.Implementations; | using MQTTnet.Implementations; | ||||
using Microsoft.Extensions.Logging; | |||||
namespace MQTTnet.TestApp.AspNetCore2 | |||||
namespace MQTTnet.AspNetCore | |||||
{ | { | ||||
public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable | public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable | ||||
{ | { | ||||
private readonly ILogger<MqttWebSocketServerAdapter> _logger; | |||||
private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; | private readonly IMqttCommunicationAdapterFactory _mqttCommunicationAdapterFactory; | ||||
public MqttWebSocketServerAdapter(ILogger<MqttWebSocketServerAdapter> logger, IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) | |||||
public MqttWebSocketServerAdapter(IMqttCommunicationAdapterFactory mqttCommunicationAdapterFactory) | |||||
{ | { | ||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||||
_mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); | _mqttCommunicationAdapterFactory = mqttCommunicationAdapterFactory ?? throw new ArgumentNullException(nameof(mqttCommunicationAdapterFactory)); | ||||
} | } | ||||
@@ -32,7 +28,7 @@ namespace MQTTnet.TestApp.AspNetCore2 | |||||
public Task StopAsync() | public Task StopAsync() | ||||
{ | { | ||||
return Task.FromResult(0); | |||||
return Task.CompletedTask; | |||||
} | } | ||||
public Task AcceptWebSocketAsync(WebSocket webSocket) | public Task AcceptWebSocketAsync(WebSocket webSocket) |
@@ -0,0 +1,25 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Hosting; | |||||
using MQTTnet.AspNetCore; | |||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Server; | |||||
namespace MQTTnet.AspnetCore | |||||
{ | |||||
public static class ServiceCollectionExtensions | |||||
{ | |||||
public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) | |||||
{ | |||||
services.AddMqttServerServices(); | |||||
services.AddSingleton<IHostedService>(s => s.GetService<MqttHostedServer>()); | |||||
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>()); | |||||
services.AddSingleton<MqttHostedServer>(); | |||||
services.AddSingleton<MqttWebSocketServerAdapter>(); | |||||
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>()); | |||||
return services; | |||||
} | |||||
} | |||||
} |
@@ -3,6 +3,7 @@ using System.IO; | |||||
using System.Net.WebSockets; | using System.Net.WebSockets; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet.Implementations | namespace MQTTnet.Implementations | ||||
{ | { | ||||
@@ -41,6 +42,7 @@ namespace MQTTnet.Implementations | |||||
{ | { | ||||
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); | var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); | ||||
currentOffset += response.Count; | currentOffset += response.Count; | ||||
count -= response.Count; | |||||
if (response.MessageType == WebSocketMessageType.Close) | if (response.MessageType == WebSocketMessageType.Close) | ||||
{ | { | ||||
@@ -48,6 +50,11 @@ namespace MQTTnet.Implementations | |||||
} | } | ||||
} | } | ||||
if (_webSocket.State == WebSocketState.Closed) | |||||
{ | |||||
throw new MqttCommunicationException( "connection closed" ); | |||||
} | |||||
return currentOffset - offset; | return currentOffset - offset; | ||||
} | } | ||||
@@ -15,14 +15,20 @@ namespace MQTTnet | |||||
{ | { | ||||
public static IServiceCollection AddMqttServer(this IServiceCollection services) | public static IServiceCollection AddMqttServer(this IServiceCollection services) | ||||
{ | { | ||||
services.AddOptions(); | |||||
services.AddSingleton<MqttFactory>(); | |||||
services.AddSingleton<IMqttCommunicationAdapterFactory, MqttFactory>(); | |||||
services.AddSingleton<IMqttClientSesssionFactory, MqttFactory>(); | |||||
services.AddMqttServerServices(); | |||||
services.AddSingleton<IMqttServer>(s => s.GetService<MqttServer>()); | |||||
services.AddSingleton<MqttServer>(); | |||||
return services; | |||||
} | |||||
services.AddSingleton<IMqttServer,MqttServer>(); | |||||
services.AddSingleton<MqttServer>(); | |||||
public static IServiceCollection AddMqttServerServices(this IServiceCollection services) | |||||
{ | |||||
services.AddOptions(); | |||||
services.AddSingleton<MqttFactory>(); | |||||
services.AddSingleton<IMqttCommunicationAdapterFactory>(s => s.GetService<MqttFactory>()); | |||||
services.AddSingleton<IMqttClientSesssionFactory>(s => s.GetService<MqttFactory>()); | |||||
services.AddTransient<IMqttServerAdapter, MqttServerAdapter>(); | services.AddTransient<IMqttServerAdapter, MqttServerAdapter>(); | ||||
services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>(); | services.AddTransient<IMqttPacketSerializer, MqttPacketSerializer>(); | ||||
@@ -42,7 +48,7 @@ namespace MQTTnet | |||||
public static IServiceCollection AddMqttClient(this IServiceCollection services) | public static IServiceCollection AddMqttClient(this IServiceCollection services) | ||||
{ | { | ||||
services.AddSingleton<MqttFactory>(); | services.AddSingleton<MqttFactory>(); | ||||
services.AddSingleton<IMqttCommunicationAdapterFactory, MqttFactory>(); | |||||
services.AddSingleton<IMqttCommunicationAdapterFactory>(s => s.GetService<MqttFactory>()); | |||||
services.AddTransient<IMqttClient, MqttClient>(); | services.AddTransient<IMqttClient, MqttClient>(); | ||||
services.AddTransient<MqttClient>(); | services.AddTransient<MqttClient>(); | ||||
@@ -3,6 +3,7 @@ using System.IO; | |||||
using System.Net.WebSockets; | using System.Net.WebSockets; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet.Implementations | namespace MQTTnet.Implementations | ||||
{ | { | ||||
@@ -27,13 +28,19 @@ namespace MQTTnet.Implementations | |||||
{ | { | ||||
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); | var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); | ||||
currentOffset += response.Count; | currentOffset += response.Count; | ||||
count -= response.Count; | |||||
if (response.MessageType == WebSocketMessageType.Close) | if (response.MessageType == WebSocketMessageType.Close) | ||||
{ | { | ||||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); | await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); | ||||
} | } | ||||
} | } | ||||
if (_webSocket.State == WebSocketState.Closed) | |||||
{ | |||||
throw new MqttCommunicationException("connection closed"); | |||||
} | |||||
return currentOffset - offset; | return currentOffset - offset; | ||||
} | } | ||||
@@ -10,5 +10,7 @@ namespace MQTTnet.Core.ManagedClient | |||||
TimeSpan AutoReconnectDelay { get; } | TimeSpan AutoReconnectDelay { get; } | ||||
IManagedMqttClientStorage Storage { get; } | IManagedMqttClientStorage Storage { get; } | ||||
Func<IManagedMqttClientOptions, string> PasswordProvider { get; } | |||||
} | } | ||||
} | } |
@@ -277,6 +277,7 @@ namespace MQTTnet.Core.ManagedClient | |||||
try | try | ||||
{ | { | ||||
_options.PasswordProvider?.Invoke(_options); | |||||
await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); | await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); | ||||
return ReconnectionResult.Reconnected; | return ReconnectionResult.Reconnected; | ||||
} | } | ||||
@@ -10,5 +10,8 @@ namespace MQTTnet.Core.ManagedClient | |||||
public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); | public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); | ||||
public IManagedMqttClientStorage Storage { get; set; } | public IManagedMqttClientStorage Storage { get; set; } | ||||
public Func<IManagedMqttClientOptions, string> PasswordProvider { get; set; } | |||||
} | } | ||||
} | } |
@@ -9,7 +9,7 @@ using System.Linq; | |||||
namespace MQTTnet.Core.Server | namespace MQTTnet.Core.Server | ||||
{ | { | ||||
public sealed class MqttServer : IMqttServer | |||||
public class MqttServer : IMqttServer | |||||
{ | { | ||||
private readonly ILogger<MqttServer> _logger; | private readonly ILogger<MqttServer> _logger; | ||||
private readonly MqttClientSessionsManager _clientSessionsManager; | private readonly MqttClientSessionsManager _clientSessionsManager; | ||||
@@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", " | |||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.AspnetCore", "Frameworks\MQTTnet.AspnetCore\MQTTnet.AspnetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}" | |||||
EndProject | |||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
Debug|Any CPU = Debug|Any CPU | Debug|Any CPU = Debug|Any CPU | ||||
@@ -163,6 +165,22 @@ Global | |||||
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU | {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU | ||||
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU | {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU | ||||
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU | {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU | ||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU | |||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
@@ -174,6 +192,7 @@ Global | |||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | ||||
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB} | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | ||||
@@ -2,10 +2,11 @@ | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.0</TargetFramework> | <TargetFramework>netcoreapp2.0</TargetFramework> | ||||
<TypeScriptToolsVersion>2.3</TypeScriptToolsVersion> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<Folder Include="wwwroot\" /> | |||||
<Content Remove="wwwroot\app\app.ts" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -13,8 +14,11 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" /> | |||||
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" /> | |||||
<ProjectReference Include="..\..\Frameworks\MQTTnet.AspnetCore\MQTTnet.AspnetCore.csproj" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<TypeScriptCompile Include="wwwroot\app\app.ts" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
</Project> | </Project> |
@@ -1,11 +1,13 @@ | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System; | |||||
using System.IO; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.AspNetCore.Builder; | using Microsoft.AspNetCore.Builder; | ||||
using Microsoft.AspNetCore.Hosting; | using Microsoft.AspNetCore.Hosting; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.FileProviders; | |||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Server; | |||||
using MQTTnet.AspnetCore; | |||||
using MQTTnet.Core; | |||||
namespace MQTTnet.TestApp.AspNetCore2 | namespace MQTTnet.TestApp.AspNetCore2 | ||||
{ | { | ||||
@@ -13,35 +15,44 @@ namespace MQTTnet.TestApp.AspNetCore2 | |||||
{ | { | ||||
public void ConfigureServices(IServiceCollection services) | public void ConfigureServices(IServiceCollection services) | ||||
{ | { | ||||
services.AddMqttServer(); | |||||
services.AddSingleton<MqttWebSocketServerAdapter>(); | |||||
services.AddSingleton<IMqttServerAdapter, MqttWebSocketServerAdapter>(); | |||||
services.AddHostedMqttServer(); | |||||
} | } | ||||
public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) | |||||
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) | |||||
{ | { | ||||
loggerFactory.AddConsole(LogLevel.Debug); | |||||
loggerFactory.AddDebug(); | |||||
var adapter = app.ApplicationServices.GetService<MqttWebSocketServerAdapter>(); | |||||
var mqttServer = app.ApplicationServices.GetService<IMqttServer>(); | |||||
await mqttServer.StartAsync(); | |||||
app.UseWebSockets(); | |||||
app.Use(async (context, next) => | |||||
app.UseMqttEndpoint(); | |||||
app.UseMqttServer(async server => | |||||
{ | { | ||||
if (context.WebSockets.IsWebSocketRequest) | |||||
var msg = new MqttApplicationMessageBuilder() | |||||
.WithPayload("Mqtt is awesome") | |||||
.WithTopic("message"); | |||||
while (true) | |||||
{ | { | ||||
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync()) | |||||
{ | |||||
await adapter.AcceptWebSocketAsync(webSocket); | |||||
} | |||||
server.Publish(msg.Build()); | |||||
await Task.Delay(TimeSpan.FromSeconds(2)); | |||||
msg.WithPayload("Mqtt is still awesome at " + DateTime.Now); | |||||
} | } | ||||
else | |||||
}); | |||||
app.Use((context, next) => | |||||
{ | |||||
if (context.Request.Path == "/") | |||||
{ | { | ||||
await next(); | |||||
context.Request.Path = "/Index.html"; | |||||
} | } | ||||
return next(); | |||||
}); | }); | ||||
app.UseStaticFiles(); | |||||
app.UseStaticFiles( new StaticFileOptions() | |||||
{ | |||||
RequestPath = "/node_modules", | |||||
FileProvider = new PhysicalFileProvider( Path.Combine(env.ContentRootPath, "node_modules" ) ) | |||||
} ); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,14 @@ | |||||
{ | |||||
"version": "1.0.0", | |||||
"name": "mqtt.test", | |||||
"private": true, | |||||
"devDependencies": { | |||||
}, | |||||
"dependencies": { | |||||
"mqtt": "2.13.1", | |||||
"@types/node": "8.0.46", | |||||
"systemjs": "0.20.19", | |||||
"typescript": "2.3.4" | |||||
} | |||||
} |
@@ -0,0 +1,12 @@ | |||||
{ | |||||
"compilerOptions": { | |||||
"noImplicitAny": false, | |||||
"noEmitOnError": true, | |||||
"removeComments": false, | |||||
"sourceMap": true, | |||||
"target": "es5" | |||||
}, | |||||
"exclude": [ | |||||
"node_modules" | |||||
] | |||||
} |
@@ -0,0 +1,35 @@ | |||||
<!DOCTYPE html> | |||||
<html> | |||||
<head> | |||||
<meta charset="utf-8" /> | |||||
<title>MQTT test</title> | |||||
</head> | |||||
<body> | |||||
<script src="node_modules/systemjs/dist/system.js"></script> | |||||
<script> | |||||
System.config({ | |||||
packages: { | |||||
'': { | |||||
defaultExtension: 'js' | |||||
} | |||||
}, | |||||
map: { | |||||
mqtt: 'node_modules/mqtt/dist/mqtt.min' | |||||
} | |||||
}); | |||||
System.import('app/app'); | |||||
</script> | |||||
<div> | |||||
<p id="state"></p> | |||||
<input id="topic" placeholder="topic" /> | |||||
<input id="msg" placeholder="msg" /> | |||||
<button id="publish">send</button> | |||||
</div> | |||||
<div> | |||||
<ul id="msgs" ></ul> | |||||
</div> | |||||
</body> | |||||
</html> |
@@ -0,0 +1,56 @@ | |||||
import { connect } from "mqtt"; | |||||
var client = connect('ws://' + location.host + '/mqtt', | |||||
{ | |||||
clientId: "client1", | |||||
}); | |||||
window.onbeforeunload = () => { | |||||
client.end(); | |||||
}; | |||||
var publishButton = document.getElementById("publish"); | |||||
var topicInput = <HTMLInputElement>document.getElementById("topic"); | |||||
var msgInput = <HTMLInputElement>document.getElementById("msg"); | |||||
var stateParagraph = document.getElementById("state"); | |||||
var msgsList = <HTMLUListElement>document.getElementById("msgs"); | |||||
publishButton.onclick = click => { | |||||
var topic = topicInput.value; | |||||
var msg = msgInput.value; | |||||
client.publish(topic, msg); | |||||
}; | |||||
client.on('connect', () => { | |||||
client.subscribe('#', { qos: 0 }, (err, granted) => { | |||||
console.log(err); | |||||
}); | |||||
client.publish('presence', 'Hello mqtt'); | |||||
stateParagraph.innerText = "connected"; | |||||
}); | |||||
client.on("error", e => { | |||||
showMsg("error: " + e.message); | |||||
}); | |||||
client.on("reconnect", () => { | |||||
stateParagraph.innerText = "reconnecting"; | |||||
}); | |||||
client.on('message', (topic, message) => { | |||||
showMsg(topic + ": " + message.toString()); | |||||
}); | |||||
function showMsg(msg: string) { | |||||
//console.log(msg); | |||||
var node = document.createElement("LI"); | |||||
node.appendChild(document.createTextNode(msg)); | |||||
msgsList.appendChild(node); | |||||
if (msgsList.childElementCount > 20) { | |||||
msgsList.removeChild(msgsList.childNodes[0]); | |||||
} | |||||
} |
@@ -7,6 +7,9 @@ using MQTTnet.Core.Packets; | |||||
using MQTTnet.Core.Protocol; | using MQTTnet.Core.Protocol; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using System.IO; | |||||
using Newtonsoft.Json; | |||||
using System.Collections.Generic; | |||||
namespace MQTTnet.TestApp.NetCore | namespace MQTTnet.TestApp.NetCore | ||||
{ | { | ||||
@@ -22,18 +25,32 @@ namespace MQTTnet.TestApp.NetCore | |||||
services.GetService<ILoggerFactory>() | services.GetService<ILoggerFactory>() | ||||
.AddConsole(); | .AddConsole(); | ||||
ClientRetainedMessageHandler ms = new ClientRetainedMessageHandler(); | |||||
Func<ManagedMqttClientOptions, string> func = delegate (ManagedMqttClientOptions managedMqttClientOptions) | |||||
{ | |||||
return "password"; | |||||
}; | |||||
var options = new ManagedMqttClientOptions | var options = new ManagedMqttClientOptions | ||||
{ | { | ||||
ClientOptions = new MqttClientTcpOptions | ClientOptions = new MqttClientTcpOptions | ||||
{ | { | ||||
Server = "broker.hivemq.com", | Server = "broker.hivemq.com", | ||||
ClientId = "MQTTnetManagedClientTest" | |||||
ClientId = "MQTTnetManagedClientTest", | |||||
Password = "pippo", | |||||
}, | }, | ||||
AutoReconnectDelay = TimeSpan.FromSeconds(1) | |||||
AutoReconnectDelay = TimeSpan.FromSeconds(1), | |||||
Storage = ms, | |||||
PasswordProvider = o => | |||||
{ | |||||
//o.ClientOptions.Password = GetPassword(); | |||||
return o.ClientOptions.Password; | |||||
} | |||||
}; | }; | ||||
try | try | ||||
{ | { | ||||
var managedClient = services.GetRequiredService<ManagedMqttClient>(); | var managedClient = services.GetRequiredService<ManagedMqttClient>(); | ||||
@@ -59,5 +76,39 @@ namespace MQTTnet.TestApp.NetCore | |||||
Console.WriteLine(e); | Console.WriteLine(e); | ||||
} | } | ||||
} | } | ||||
public static string GetPassword() | |||||
{ | |||||
return "password"; | |||||
} | |||||
public class ClientRetainedMessageHandler : IManagedMqttClientStorage | |||||
{ | |||||
private const string Filename = @"RetainedMessages.json"; | |||||
public Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages) | |||||
{ | |||||
File.WriteAllText(Filename, JsonConvert.SerializeObject(messages)); | |||||
return Task.FromResult(0); | |||||
} | |||||
public Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync() | |||||
{ | |||||
IList<MqttApplicationMessage> retainedMessages; | |||||
if (File.Exists(Filename)) | |||||
{ | |||||
var json = File.ReadAllText(Filename); | |||||
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json); | |||||
} | |||||
else | |||||
{ | |||||
retainedMessages = new List<MqttApplicationMessage>(); | |||||
} | |||||
return Task.FromResult(retainedMessages); | |||||
} | |||||
} | |||||
} | } | ||||
} | } |