diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index 6efe750..d38de6f 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -8,11 +8,11 @@ namespace MQTTnet.Implementations { public class WebSocketStream : Stream { - private readonly ClientWebSocket _webSocket; + private readonly WebSocket _webSocket; - public WebSocketStream(ClientWebSocket webSocket) + public WebSocketStream(WebSocket webSocket) { - _webSocket = webSocket; + _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); } public override bool CanRead => true; diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs index 69225b9..1843c6a 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs @@ -8,11 +8,11 @@ namespace MQTTnet.Implementations { public class WebSocketStream : Stream { - private readonly ClientWebSocket _webSocket; + private readonly WebSocket _webSocket; - public WebSocketStream(ClientWebSocket webSocket) + public WebSocketStream(WebSocket webSocket) { - _webSocket = webSocket; + _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); } public override void Flush() diff --git a/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs b/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs index 6d7fb54..6caca0e 100644 --- a/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs +++ b/MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace MQTTnet.Core.Adapter { @@ -10,5 +11,7 @@ namespace MQTTnet.Core.Adapter } public IMqttCommunicationAdapter Client { get; } + + public Task SessionTask { get; set; } } } diff --git a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs index 35a828c..ece6cf7 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs @@ -6,9 +6,9 @@ namespace MQTTnet.Core.Diagnostics { private readonly IMqttNetTraceHandler _traceHandler; - public MqttNetTrace(IMqttNetTraceHandler traceHandler = null) + public MqttNetTrace(IMqttNetTraceHandler customTraceHandler = null) { - _traceHandler = traceHandler ?? this; + _traceHandler = customTraceHandler ?? this; } public static event EventHandler TraceMessagePublished; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index 7b13d71..7da5957 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -81,7 +81,7 @@ namespace MQTTnet.Core.Server private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { - Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token); + eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token); } private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) diff --git a/MQTTnet.sln b/MQTTnet.sln index 79e8b4d..db44de8 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -31,6 +31,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -145,6 +147,22 @@ Global {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.Build.0 = Release|Any CPU {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.ActiveCfg = Release|Any CPU {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.ActiveCfg = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.Build.0 = Debug|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -155,6 +173,7 @@ Global {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} + {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj new file mode 100644 index 0000000..ce35f43 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -0,0 +1,20 @@ + + + + netcoreapp2.0 + + + + + + + + + + + + + + + + diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs b/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs new file mode 100644 index 0000000..c9f5293 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs @@ -0,0 +1,95 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; +using MQTTnet.Implementations; + +namespace MQTTnet.TestApp.AspNetCore2 +{ + public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable + { + private readonly MqttNetTrace _trace; + + public MqttWebSocketServerAdapter(MqttNetTrace trace) + { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + } + + public event EventHandler ClientAccepted; + + public Task StartAsync(MqttServerOptions options) + { + return Task.CompletedTask; + } + + public Task StopAsync() + { + return Task.FromResult(0); + } + + public Task AcceptWebSocketAsync(WebSocket webSocket) + { + if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); + + var channel = new MqttWebSocketServerChannel(webSocket); + var clientAdapter = new MqttChannelCommunicationAdapter(channel, new MqttPacketSerializer(), _trace); + + var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); + ClientAccepted?.Invoke(this, eventArgs); + return eventArgs.SessionTask; + } + + public void Dispose() + { + StopAsync(); + } + + private class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable + { + private readonly WebSocket _webSocket; + + public MqttWebSocketServerChannel(WebSocket webSocket) + { + _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); + + RawReceiveStream = new WebSocketStream(_webSocket); + } + + public Stream SendStream => RawReceiveStream; + public Stream ReceiveStream => RawReceiveStream; + public Stream RawReceiveStream { get; } + + public Task ConnectAsync() + { + return Task.CompletedTask; + } + + public Task DisconnectAsync() + { + RawReceiveStream?.Dispose(); + + if (_webSocket == null) + { + return Task.CompletedTask; + } + + return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + RawReceiveStream?.Dispose(); + SendStream?.Dispose(); + ReceiveStream?.Dispose(); + + _webSocket?.Dispose(); + } + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs new file mode 100644 index 0000000..38b8c12 --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs @@ -0,0 +1,18 @@ +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; + +namespace MQTTnet.TestApp.AspNetCore2 +{ + public static class Program + { + public static void Main(string[] args) + { + BuildWebHost(args).Run(); + } + + private static IWebHost BuildWebHost(string[] args) => + WebHost.CreateDefaultBuilder(args) + .UseStartup() + .Build(); + } +} diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs new file mode 100644 index 0000000..d730d2c --- /dev/null +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -0,0 +1,55 @@ +using System.Collections.Generic; +using System.Diagnostics; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Server; + +namespace MQTTnet.TestApp.AspNetCore2 +{ + public class Startup + { + public void ConfigureServices(IServiceCollection services) + { + } + + public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + { + loggerFactory.AddConsole(LogLevel.Debug); + + MqttNetTrace.TraceMessagePublished += (s, e) => + { + Debug.WriteLine($">> [{e.TraceMessage.Timestamp}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); + if (e.TraceMessage.Exception != null) + { + Debug.WriteLine(e.TraceMessage.Exception.Message); + } + }; + + var trace = new MqttNetTrace(); + var adapter = new MqttWebSocketServerAdapter(trace); + var options = new MqttServerOptions(); + var mqttServer = new MqttServer(options, new List { adapter }, new MqttNetTrace()); + await mqttServer.StartAsync(); + + app.UseWebSockets(); + app.Use(async (context, next) => + { + if (context.WebSockets.IsWebSocketRequest) + { + using (var webSocket = await context.WebSockets.AcceptWebSocketAsync()) + { + await adapter.AcceptWebSocketAsync(webSocket); + } + } + else + { + await next(); + } + }); + } + } +}