Explorar el Código

Add support for WebSocket server via ASP.NET Core 2.0

release/3.x.x
Christian Kratky hace 7 años
padre
commit
f9eb6ed85e
Se han modificado 10 ficheros con 219 adiciones y 9 borrados
  1. +3
    -3
      Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
  2. +3
    -3
      Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs
  3. +3
    -0
      MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs
  4. +2
    -2
      MQTTnet.Core/Diagnostics/MqttNetTrace.cs
  5. +1
    -1
      MQTTnet.Core/Server/MqttServer.cs
  6. +19
    -0
      MQTTnet.sln
  7. +20
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj
  8. +95
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs
  9. +18
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/Program.cs
  10. +55
    -0
      Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs

+ 3
- 3
Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs Ver fichero

@@ -8,11 +8,11 @@ namespace MQTTnet.Implementations
{
public class WebSocketStream : Stream
{
private readonly ClientWebSocket _webSocket;
private readonly WebSocket _webSocket;
public WebSocketStream(ClientWebSocket webSocket)
public WebSocketStream(WebSocket webSocket)
{
_webSocket = webSocket;
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
}

public override bool CanRead => true;


+ 3
- 3
Frameworks/MQTTnet.UniversalWindows/Implementations/WebSocketStream.cs Ver fichero

@@ -8,11 +8,11 @@ namespace MQTTnet.Implementations
{
public class WebSocketStream : Stream
{
private readonly ClientWebSocket _webSocket;
private readonly WebSocket _webSocket;

public WebSocketStream(ClientWebSocket webSocket)
public WebSocketStream(WebSocket webSocket)
{
_webSocket = webSocket;
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
}

public override void Flush()


+ 3
- 0
MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs Ver fichero

@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Core.Adapter
{
@@ -10,5 +11,7 @@ namespace MQTTnet.Core.Adapter
}

public IMqttCommunicationAdapter Client { get; }

public Task SessionTask { get; set; }
}
}

+ 2
- 2
MQTTnet.Core/Diagnostics/MqttNetTrace.cs Ver fichero

@@ -6,9 +6,9 @@ namespace MQTTnet.Core.Diagnostics
{
private readonly IMqttNetTraceHandler _traceHandler;

public MqttNetTrace(IMqttNetTraceHandler traceHandler = null)
public MqttNetTrace(IMqttNetTraceHandler customTraceHandler = null)
{
_traceHandler = traceHandler ?? this;
_traceHandler = customTraceHandler ?? this;
}
public static event EventHandler<MqttNetTraceMessagePublishedEventArgs> TraceMessagePublished;


+ 1
- 1
MQTTnet.Core/Server/MqttServer.cs Ver fichero

@@ -81,7 +81,7 @@ namespace MQTTnet.Core.Server

private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token);
eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token);
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)


+ 19
- 0
MQTTnet.sln Ver fichero

@@ -31,6 +31,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -145,6 +147,22 @@ Global
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.Build.0 = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.ActiveCfg = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -155,6 +173,7 @@ Global
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}


+ 20
- 0
Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj Ver fichero

@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Folder Include="wwwroot\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" />
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" />
</ItemGroup>

</Project>

+ 95
- 0
Tests/MQTTnet.TestApp.AspNetCore2/MqttWebSocketServerAdapter.cs Ver fichero

@@ -0,0 +1,95 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server;
using MQTTnet.Implementations;

namespace MQTTnet.TestApp.AspNetCore2
{
public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable
{
private readonly MqttNetTrace _trace;

public MqttWebSocketServerAdapter(MqttNetTrace trace)
{
_trace = trace ?? throw new ArgumentNullException(nameof(trace));
}

public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;

public Task StartAsync(MqttServerOptions options)
{
return Task.CompletedTask;
}

public Task StopAsync()
{
return Task.FromResult(0);
}

public Task AcceptWebSocketAsync(WebSocket webSocket)
{
if (webSocket == null) throw new ArgumentNullException(nameof(webSocket));

var channel = new MqttWebSocketServerChannel(webSocket);
var clientAdapter = new MqttChannelCommunicationAdapter(channel, new MqttPacketSerializer(), _trace);

var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter);
ClientAccepted?.Invoke(this, eventArgs);
return eventArgs.SessionTask;
}
public void Dispose()
{
StopAsync();
}

private class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable
{
private readonly WebSocket _webSocket;

public MqttWebSocketServerChannel(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

RawReceiveStream = new WebSocketStream(_webSocket);
}

public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
public Stream RawReceiveStream { get; }

public Task ConnectAsync()
{
return Task.CompletedTask;
}

public Task DisconnectAsync()
{
RawReceiveStream?.Dispose();

if (_webSocket == null)
{
return Task.CompletedTask;
}

return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

public void Dispose()
{
RawReceiveStream?.Dispose();
SendStream?.Dispose();
ReceiveStream?.Dispose();

_webSocket?.Dispose();
}
}
}
}

+ 18
- 0
Tests/MQTTnet.TestApp.AspNetCore2/Program.cs Ver fichero

@@ -0,0 +1,18 @@
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;

namespace MQTTnet.TestApp.AspNetCore2
{
public static class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}

private static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
}

+ 55
- 0
Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs Ver fichero

@@ -0,0 +1,55 @@
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Server;

namespace MQTTnet.TestApp.AspNetCore2
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
}

public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(LogLevel.Debug);

MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Debug.WriteLine($">> [{e.TraceMessage.Timestamp}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
{
Debug.WriteLine(e.TraceMessage.Exception.Message);
}
};

var trace = new MqttNetTrace();
var adapter = new MqttWebSocketServerAdapter(trace);
var options = new MqttServerOptions();
var mqttServer = new MqttServer(options, new List<IMqttServerAdapter> { adapter }, new MqttNetTrace());
await mqttServer.StartAsync();
app.UseWebSockets();
app.Use(async (context, next) =>
{
if (context.WebSockets.IsWebSocketRequest)
{
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync())
{
await adapter.AcceptWebSocketAsync(webSocket);
}
}
else
{
await next();
}
});
}
}
}

Cargando…
Cancelar
Guardar