diff --git a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs new file mode 100644 index 0000000..e308ccd --- /dev/null +++ b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs @@ -0,0 +1,34 @@ +using System; +using System.Net; +using MQTTnet.Adapter; +using MQTTnet.AspNetCore.Client.Tcp; +using MQTTnet.Client; +using MQTTnet.Diagnostics; +using MQTTnet.Serializer; + +namespace MQTTnet.AspNetCore.Client +{ + public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory + { + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + var serializer = new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }; + + switch (options.ChannelOptions) + { + case MqttClientTcpOptions tcpOptions: + { + var endpoint = new DnsEndPoint(tcpOptions.Server, tcpOptions.GetPort()); + var tcpConnection = new TcpConnection(endpoint); + return new MqttConnectionContext(serializer, tcpConnection); + } + default: + { + throw new NotSupportedException(); + } + } + } + } +} diff --git a/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs similarity index 93% rename from Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs index 879306c..5911a3a 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/BufferExtensions.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs @@ -1,7 +1,7 @@ using System; using System.Runtime.InteropServices; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { public static class BufferExtensions { diff --git a/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs similarity index 96% rename from Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs index f5f3316..e234da5 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/DuplexPipe.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs @@ -1,6 +1,6 @@ using System.IO.Pipelines; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { public class DuplexPipe : IDuplexPipe { diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs similarity index 97% rename from Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs index 2271bd7..96160d1 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/SocketAwaitable.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs @@ -6,7 +6,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { public class SocketAwaitable : ICriticalNotifyCompletion { diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs similarity index 96% rename from Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs index bc8e5c0..219b722 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/SocketReceiver.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs @@ -2,7 +2,7 @@ using System.IO.Pipelines; using System.Net.Sockets; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { public class SocketReceiver { diff --git a/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs similarity index 98% rename from Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs index 7cb1bc1..c8ba832 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/SocketSender.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs @@ -5,7 +5,7 @@ using System.Diagnostics; using System.IO.Pipelines; using System.Net.Sockets; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { public class SocketSender { diff --git a/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs similarity index 81% rename from Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs index 83cb98a..7417e74 100644 --- a/Tests/MQTTnet.Benchmarks/Tcp/TcpConnection.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs @@ -1,30 +1,39 @@ using System; +using System.Collections.Generic; using System.IO; using System.IO.Pipelines; using System.Net; using System.Net.Sockets; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Http.Features; using MQTTnet.Exceptions; -namespace MQTTnet.Benchmarks.Tcp +namespace MQTTnet.AspNetCore.Client.Tcp { - public class TcpConnection + public class TcpConnection : ConnectionContext { - private readonly Socket _socket; private volatile bool _aborted; private readonly EndPoint _endPoint; + private SocketSender _sender; + private SocketReceiver _receiver; + + private Socket _socket; private IDuplexPipe _application; - private IDuplexPipe _transport; - private readonly SocketSender _sender; - private readonly SocketReceiver _receiver; + + + public bool IsConnected { get; private set; } + + public override string ConnectionId { get; set; } + + public override IFeatureCollection Features { get; } + + public override IDictionary Items { get; set; } + public override IDuplexPipe Transport { get; set; } public TcpConnection(EndPoint endPoint) { - _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _endPoint = endPoint; - - _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); - _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); } public TcpConnection(Socket socket) @@ -38,29 +47,34 @@ namespace MQTTnet.Benchmarks.Tcp public Task DisposeAsync() { - _transport?.Output.Complete(); - _transport?.Input.Complete(); + IsConnected = false; + + Transport?.Output.Complete(); + Transport?.Input.Complete(); _socket?.Dispose(); return Task.CompletedTask; } - public async Task StartAsync() + public async Task StartAsync() { - if (!_socket.Connected) + if (_socket == null) { + _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _sender = new SocketSender(_socket, PipeScheduler.ThreadPool); + _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool); await _socket.ConnectAsync(_endPoint); } var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default); - _transport = pair.Transport; + Transport = pair.Transport; _application = pair.Application; _ = ExecuteAsync(); - return pair.Transport; + IsConnected = true; } private async Task ExecuteAsync() @@ -118,14 +132,14 @@ namespace MQTTnet.Benchmarks.Tcp if (!_aborted) { // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix. - //error = new MqttCommunicationException(); + error = ConnectionAborted(); } } catch (ObjectDisposedException) { if (!_aborted) { - //error = new MqttCommunicationException(); + error = ConnectionAborted(); } } catch (IOException ex) @@ -140,7 +154,7 @@ namespace MQTTnet.Benchmarks.Tcp { if (_aborted) { - //error = error ?? new MqttCommunicationException(); + error = error ?? ConnectionAborted(); } _application.Output.Complete(error); @@ -180,6 +194,11 @@ namespace MQTTnet.Benchmarks.Tcp } } + private Exception ConnectionAborted() + { + return new MqttCommunicationException("Connection Aborted"); + } + private async Task DoSend() { Exception error = null; diff --git a/Source/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs b/Source/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs new file mode 100644 index 0000000..262a333 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs @@ -0,0 +1,12 @@ +using Microsoft.AspNetCore.Connections; + +namespace MQTTnet.AspNetCore +{ + public static class ConnectionBuilderExtensions + { + public static IConnectionBuilder UseMqtt(this IConnectionBuilder builder) + { + return builder.UseConnectionHandler(); + } + } +} diff --git a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj index 6b85895..8db250b 100644 --- a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj +++ b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj @@ -9,6 +9,7 @@ + 7.2 @@ -16,7 +17,7 @@ - + diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs new file mode 100644 index 0000000..3d42937 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -0,0 +1,120 @@ +using Microsoft.AspNetCore.Connections; +using MQTTnet.Adapter; +using MQTTnet.AspNetCore.Client.Tcp; +using MQTTnet.Packets; +using MQTTnet.Serializer; +using System; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.AspNetCore +{ + public class MqttConnectionContext : IMqttChannelAdapter + { + public IMqttPacketSerializer PacketSerializer { get; } + public ConnectionContext Connection { get; } + + public string Endpoint => Connection.ConnectionId; + + public MqttConnectionContext( + IMqttPacketSerializer packetSerializer, + ConnectionContext connection) + { + PacketSerializer = packetSerializer; + Connection = connection; + } + + public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + if (Connection is TcpConnection tcp && !tcp.IsConnected) + { + return tcp.StartAsync(); + } + return Task.CompletedTask; + } + + public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + Connection.Transport.Input.Complete(); + Connection.Transport.Output.Complete(); + + return Task.CompletedTask; + } + + public void Dispose() + { + } + + public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + var input = Connection.Transport.Input; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + ReadResult readResult; + var readTask = input.ReadAsync(cancellationToken); + if (readTask.IsCompleted) + { + readResult = readTask.Result; + } + else + { + readResult = await readTask; + } + + var buffer = readResult.Buffer; + + var consumed = buffer.Start; + var observed = buffer.Start; + + try + { + if (!buffer.IsEmpty) + { + if (PacketSerializer.TryDeserialize(buffer, out var packet, out consumed, out observed)) + { + return packet; + } + else + { + // we did receive something but the message is not yet complete + ReadingPacketStarted?.Invoke(this, EventArgs.Empty); + } + } + else if (readResult.IsCompleted) + { + break; + } + } + finally + { + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + input.AdvanceTo(consumed, observed); + } + } + } + finally + { + ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); + } + + cancellationToken.ThrowIfCancellationRequested(); + return null; + } + + public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { + var buffer = PacketSerializer.Serialize(packet); + return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask(); + } + + public event EventHandler ReadingPacketStarted; + public event EventHandler ReadingPacketCompleted; + } +} diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs new file mode 100644 index 0000000..49a5c09 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs @@ -0,0 +1,40 @@ +using Microsoft.AspNetCore.Connections; +using MQTTnet.Adapter; +using MQTTnet.Serializer; +using MQTTnet.Server; +using System; +using System.Threading.Tasks; + +namespace MQTTnet.AspNetCore +{ + public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter + { + public event EventHandler ClientAccepted; + + public override async Task OnConnectedAsync(ConnectionContext connection) + { + var serializer = new MqttPacketSerializer(); + using (var adapter = new MqttConnectionContext(serializer, connection)) + { + var args = new MqttServerAdapterClientAcceptedEventArgs(adapter); + ClientAccepted?.Invoke(this, args); + + await args.SessionTask; + } + } + + public Task StartAsync(IMqttServerOptions options) + { + return Task.CompletedTask; + } + + public Task StopAsync() + { + return Task.CompletedTask; + } + + public void Dispose() + { + } + } +} diff --git a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs new file mode 100644 index 0000000..0411710 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs @@ -0,0 +1,85 @@ +using System; +using System.Buffers; +using System.IO; +using MQTTnet.Adapter; +using MQTTnet.Exceptions; +using MQTTnet.Packets; +using MQTTnet.Serializer; + +namespace MQTTnet.AspNetCore +{ + public static class ReaderExtensions + { + private static bool TryReadBodyLength(ref ReadOnlySequence input, out int result) + { + // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. + var multiplier = 1; + var value = 0; + byte encodedByte; + var index = 1; + result = 0; + + var temp = input.Slice(0, Math.Min(5, input.Length)).GetArray(); + + do + { + if (index == temp.Length) + { + return false; + } + encodedByte = temp[index]; + index++; + + value += (byte)(encodedByte & 127) * multiplier; + if (multiplier > 128 * 128 * 128) + { + throw new MqttProtocolViolationException($"Remaining length is invalid (Data={string.Join(",", temp.AsSpan(1, index).ToArray())})."); + } + + multiplier *= 128; + } while ((encodedByte & 128) != 0); + + input = input.Slice(index); + + result = value; + return true; + } + + + + public static byte[] GetArray(this in ReadOnlySequence input) + { + if (input.IsSingleSegment) + { + return input.First.Span.ToArray(); + } + + // Should be rare + return input.ToArray(); + } + + public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) + { + packet = null; + consumed = input.Start; + observed = input.End; + var copy = input; + if (copy.Length < 2) + { + return false; + } + + var fixedheader = copy.First.Span[0]; + if (!TryReadBodyLength(ref copy, out var bodyLength)) + { + return false; + } + + var bodySlice = copy.Slice(0, bodyLength); + packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray()))); + consumed = bodySlice.End; + observed = bodySlice.End; + return true; + } + } +} diff --git a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs index 5b8f161..fa061eb 100644 --- a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -30,10 +30,19 @@ namespace MQTTnet.AspNetCore if (options.DefaultEndpointOptions.IsEnabled) { + services.AddSingleton(); services.AddSingleton(s => s.GetService()); } return services; } + + public static IServiceCollection AddMqttConnectionHandler(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); + + return services; + } } } diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index 00438fa..e830ad2 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -22,6 +22,14 @@ namespace MQTTnet return new MqttClient(new MqttClientAdapterFactory(), logger); } + public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory mqttClientAdapterFactory) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + if (mqttClientAdapterFactory == null) throw new ArgumentNullException(nameof(mqttClientAdapterFactory)); + + return new MqttClient(mqttClientAdapterFactory, logger); + } + public IMqttServer CreateMqttServer() { var logger = new MqttNetLogger(); diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj index 5e612fb..2a6b91a 100644 --- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj +++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj @@ -10,9 +10,11 @@ + + diff --git a/Tests/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs b/Tests/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs new file mode 100644 index 0000000..1e2fbb2 --- /dev/null +++ b/Tests/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs @@ -0,0 +1,76 @@ +using BenchmarkDotNet.Attributes; +using MQTTnet.Client; + + +using MQTTnet.AspNetCore; + +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using MQTTnet.Server; +using MQTTnet.Diagnostics; +using MQTTnet.AspNetCore.Client; + +namespace MQTTnet.Benchmarks +{ + [MemoryDiagnoser] + public class MessageProcessingMqttConnectionContextBenchmark + { + private IWebHost _host; + private IMqttClient _mqttClient; + private MqttApplicationMessage _message; + + [GlobalSetup] + public void Setup() + { + _host = WebHost.CreateDefaultBuilder() + .UseKestrel(o => o.ListenAnyIP(1883, l => l.UseMqtt())) + .ConfigureServices(services => { + var mqttServerOptions = new MqttServerOptionsBuilder() + .WithoutDefaultEndpoint() + .Build(); + services + .AddHostedMqttServer(mqttServerOptions) + .AddMqttConnectionHandler(); + }) + .Configure(app => { + app.UseMqttServer(s => { + + }); + }) + .Build(); + + var factory = new MqttFactory(); + _mqttClient = factory.CreateMqttClient(new MqttNetLogger(), new MqttClientConnectionContextFactory()); + + _host.StartAsync().GetAwaiter().GetResult(); + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost").Build(); + + _mqttClient.ConnectAsync(clientOptions).GetAwaiter().GetResult(); + + _message = new MqttApplicationMessageBuilder() + .WithTopic("A") + .Build(); + } + + [GlobalCleanup] + public void Cleanup() + { + _mqttClient.DisconnectAsync().GetAwaiter().GetResult(); + _mqttClient.Dispose(); + + _host.StopAsync().GetAwaiter().GetResult(); + _host.Dispose(); + } + + [Benchmark] + public void Send_10000_Messages() + { + for (var i = 0; i < 10000; i++) + { + _mqttClient.PublishAsync(_message).GetAwaiter().GetResult(); + } + } + } +} diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index 5306916..9407ccf 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -16,6 +16,7 @@ namespace MQTTnet.Benchmarks Console.WriteLine("5 = ChannelAdapterBenchmark"); Console.WriteLine("6 = MqttTcpChannelBenchmark"); Console.WriteLine("7 = TcpPipesBenchmark"); + Console.WriteLine("8 = MessageProcessingMqttConnectionContextBenchmark"); var pressedKey = Console.ReadKey(true); switch (pressedKey.KeyChar) @@ -41,6 +42,9 @@ namespace MQTTnet.Benchmarks case '7': BenchmarkRunner.Run(); break; + case '8': + BenchmarkRunner.Run(new AllowNonOptimized()); + break; } Console.ReadLine(); diff --git a/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs index cb3ffba..4252ab3 100644 --- a/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/TcpPipesBenchmark.cs @@ -4,7 +4,7 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; -using MQTTnet.Benchmarks.Tcp; +using MQTTnet.AspNetCore.Client.Tcp; namespace MQTTnet.Benchmarks { @@ -25,10 +25,12 @@ namespace MQTTnet.Benchmarks var clientConnection = new TcpConnection(new IPEndPoint(IPAddress.Loopback, 1883)); - _client = clientConnection.StartAsync().GetAwaiter().GetResult(); + clientConnection.StartAsync().GetAwaiter().GetResult(); + _client = clientConnection.Transport; var serverConnection = new TcpConnection(task.GetAwaiter().GetResult()); - _server = serverConnection.StartAsync().GetAwaiter().GetResult(); + serverConnection.StartAsync().GetAwaiter().GetResult(); + _server = serverConnection.Transport; } diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj index c587ecc..a216d72 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj +++ b/Tests/MQTTnet.TestApp.AspNetCore2/MQTTnet.TestApp.AspNetCore2.csproj @@ -1,7 +1,7 @@  - netcoreapp2.0 + netcoreapp2.1 Latest @@ -10,7 +10,8 @@ - + + diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs index 38b8c12..5248ce6 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Program.cs @@ -1,5 +1,6 @@ using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; +using MQTTnet.AspNetCore; namespace MQTTnet.TestApp.AspNetCore2 { @@ -12,6 +13,10 @@ namespace MQTTnet.TestApp.AspNetCore2 private static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) + .UseKestrel(o => { + o.ListenAnyIP(1883, l => l.UseMqtt()); + o.ListenAnyIP(5000); // default http pipeline + }) .UseStartup() .Build(); } diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index 591cc87..f52040a 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -17,8 +17,12 @@ namespace MQTTnet.TestApp.AspNetCore2 public void ConfigureServices(IServiceCollection services) { - var mqttServerOptions = new MqttServerOptionsBuilder().Build(); - services.AddHostedMqttServer(mqttServerOptions); + var mqttServerOptions = new MqttServerOptionsBuilder() + .WithoutDefaultEndpoint() + .Build(); + services + .AddHostedMqttServer(mqttServerOptions) + .AddMqttConnectionHandler(); } // In class _Startup_ of the ASP.NET Core 2.0 project.