From 66048931a2bae650582e638b010cee0104a797f8 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 17 Jun 2018 17:19:08 +0200 Subject: [PATCH] Refactor ASP net Core integration and fix breaking change. --- .../Client/Tcp/SocketAwaitable.cs | 3 +- .../Client/Tcp/SocketReceiver.cs | 1 - .../Client/Tcp/SocketSender.cs | 1 - .../Client/Tcp/TcpConnection.cs | 6 -- .../MqttConnectionContext.cs | 25 +++---- Source/MQTTnet.AspnetCore/MqttHostedServer.cs | 3 +- Source/MQTTnet.AspnetCore/ReaderExtensions.cs | 75 +++++++++---------- 7 files changed, 51 insertions(+), 63 deletions(-) diff --git a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs index 96160d1..dbc2612 100644 --- a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs @@ -23,9 +23,10 @@ namespace MQTTnet.AspNetCore.Client.Tcp _ioScheduler = ioScheduler; } - public SocketAwaitable GetAwaiter() => this; public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + public SocketAwaitable GetAwaiter() => this; + public int GetResult() { Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); diff --git a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs index 219b722..7d11fa2 100644 --- a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs @@ -24,7 +24,6 @@ namespace MQTTnet.AspNetCore.Client.Tcp _eventArgs.SetBuffer(buffer); #else var segment = buffer.GetArray(); - _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); #endif if (!_socket.ReceiveAsync(_eventArgs)) diff --git a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs index c8ba832..55192d6 100644 --- a/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs @@ -61,7 +61,6 @@ namespace MQTTnet.AspNetCore.Client.Tcp _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); #else var segment = memory.GetArray(); - _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); #endif if (!_socket.SendAsync(_eventArgs)) diff --git a/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs index 7417e74..37913fe 100644 --- a/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs @@ -21,13 +21,9 @@ namespace MQTTnet.AspNetCore.Client.Tcp private Socket _socket; private IDuplexPipe _application; - public bool IsConnected { get; private set; } - public override string ConnectionId { get; set; } - public override IFeatureCollection Features { get; } - public override IDictionary Items { get; set; } public override IDuplexPipe Transport { get; set; } @@ -209,11 +205,9 @@ namespace MQTTnet.AspNetCore.Client.Tcp } catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) { - error = null; } catch (ObjectDisposedException) { - error = null; } catch (IOException ex) { diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index 3d42937..8a8469d 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -4,7 +4,6 @@ using MQTTnet.AspNetCore.Client.Tcp; using MQTTnet.Packets; using MQTTnet.Serializer; using System; -using System.Collections.Generic; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; @@ -13,11 +12,6 @@ namespace MQTTnet.AspNetCore { public class MqttConnectionContext : IMqttChannelAdapter { - public IMqttPacketSerializer PacketSerializer { get; } - public ConnectionContext Connection { get; } - - public string Endpoint => Connection.ConnectionId; - public MqttConnectionContext( IMqttPacketSerializer packetSerializer, ConnectionContext connection) @@ -26,6 +20,12 @@ namespace MQTTnet.AspNetCore Connection = connection; } + public string Endpoint => Connection.ConnectionId; + public ConnectionContext Connection { get; } + public IMqttPacketSerializer PacketSerializer { get; } + public event EventHandler ReadingPacketStarted; + public event EventHandler ReadingPacketCompleted; + public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { if (Connection is TcpConnection tcp && !tcp.IsConnected) @@ -43,10 +43,6 @@ namespace MQTTnet.AspNetCore return Task.CompletedTask; } - public void Dispose() - { - } - public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) { var input = Connection.Transport.Input; @@ -106,15 +102,16 @@ namespace MQTTnet.AspNetCore cancellationToken.ThrowIfCancellationRequested(); return null; - } + } public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { var buffer = PacketSerializer.Serialize(packet); return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask(); } - - public event EventHandler ReadingPacketStarted; - public event EventHandler ReadingPacketCompleted; + + public void Dispose() + { + } } } diff --git a/Source/MQTTnet.AspnetCore/MqttHostedServer.cs b/Source/MQTTnet.AspnetCore/MqttHostedServer.cs index 708d383..4b6b436 100644 --- a/Source/MQTTnet.AspnetCore/MqttHostedServer.cs +++ b/Source/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -13,7 +13,8 @@ namespace MQTTnet.AspNetCore { private readonly IMqttServerOptions _options; - public MqttHostedServer(IMqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) : base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer))) + public MqttHostedServer(IMqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) + : base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer))) { _options = options ?? throw new ArgumentNullException(nameof(options)); } diff --git a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs index 0411710..2b7d8a6 100644 --- a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs @@ -1,6 +1,5 @@ using System; using System.Buffers; -using System.IO; using MQTTnet.Adapter; using MQTTnet.Exceptions; using MQTTnet.Packets; @@ -8,8 +7,43 @@ using MQTTnet.Serializer; namespace MQTTnet.AspNetCore { - public static class ReaderExtensions + public static class ReaderExtensions { + public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) + { + packet = null; + consumed = input.Start; + observed = input.End; + var copy = input; + if (copy.Length < 2) + { + return false; + } + + var fixedheader = copy.First.Span[0]; + if (!TryReadBodyLength(ref copy, out var bodyLength)) + { + return false; + } + + var bodySlice = copy.Slice(0, bodyLength); + packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray(), 0))); + consumed = bodySlice.End; + observed = bodySlice.End; + return true; + } + + private static byte[] GetArray(this in ReadOnlySequence input) + { + if (input.IsSingleSegment) + { + return input.First.Span.ToArray(); + } + + // Should be rare + return input.ToArray(); + } + private static bool TryReadBodyLength(ref ReadOnlySequence input, out int result) { // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. @@ -44,42 +78,5 @@ namespace MQTTnet.AspNetCore result = value; return true; } - - - - public static byte[] GetArray(this in ReadOnlySequence input) - { - if (input.IsSingleSegment) - { - return input.First.Span.ToArray(); - } - - // Should be rare - return input.ToArray(); - } - - public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed) - { - packet = null; - consumed = input.Start; - observed = input.End; - var copy = input; - if (copy.Length < 2) - { - return false; - } - - var fixedheader = copy.First.Span[0]; - if (!TryReadBodyLength(ref copy, out var bodyLength)) - { - return false; - } - - var bodySlice = copy.Slice(0, bodyLength); - packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray()))); - consumed = bodySlice.End; - observed = bodySlice.End; - return true; - } } }