Browse Source

Refactor ASP net Core integration and fix breaking change.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
66048931a2
7 changed files with 51 additions and 63 deletions
  1. +2
    -1
      Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs
  2. +0
    -1
      Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs
  3. +0
    -1
      Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs
  4. +0
    -6
      Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs
  5. +11
    -14
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  6. +2
    -1
      Source/MQTTnet.AspnetCore/MqttHostedServer.cs
  7. +36
    -39
      Source/MQTTnet.AspnetCore/ReaderExtensions.cs

+ 2
- 1
Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs View File

@@ -23,9 +23,10 @@ namespace MQTTnet.AspNetCore.Client.Tcp
_ioScheduler = ioScheduler; _ioScheduler = ioScheduler;
} }


public SocketAwaitable GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);


public SocketAwaitable GetAwaiter() => this;

public int GetResult() public int GetResult()
{ {
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));


+ 0
- 1
Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs View File

@@ -24,7 +24,6 @@ namespace MQTTnet.AspNetCore.Client.Tcp
_eventArgs.SetBuffer(buffer); _eventArgs.SetBuffer(buffer);
#else #else
var segment = buffer.GetArray(); var segment = buffer.GetArray();

_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
#endif #endif
if (!_socket.ReceiveAsync(_eventArgs)) if (!_socket.ReceiveAsync(_eventArgs))


+ 0
- 1
Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs View File

@@ -61,7 +61,6 @@ namespace MQTTnet.AspNetCore.Client.Tcp
_eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); _eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
#else #else
var segment = memory.GetArray(); var segment = memory.GetArray();

_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); _eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
#endif #endif
if (!_socket.SendAsync(_eventArgs)) if (!_socket.SendAsync(_eventArgs))


+ 0
- 6
Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs View File

@@ -21,13 +21,9 @@ namespace MQTTnet.AspNetCore.Client.Tcp
private Socket _socket; private Socket _socket;
private IDuplexPipe _application; private IDuplexPipe _application;



public bool IsConnected { get; private set; } public bool IsConnected { get; private set; }

public override string ConnectionId { get; set; } public override string ConnectionId { get; set; }

public override IFeatureCollection Features { get; } public override IFeatureCollection Features { get; }

public override IDictionary<object, object> Items { get; set; } public override IDictionary<object, object> Items { get; set; }
public override IDuplexPipe Transport { get; set; } public override IDuplexPipe Transport { get; set; }


@@ -209,11 +205,9 @@ namespace MQTTnet.AspNetCore.Client.Tcp
} }
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{ {
error = null;
} }
catch (ObjectDisposedException) catch (ObjectDisposedException)
{ {
error = null;
} }
catch (IOException ex) catch (IOException ex)
{ {


+ 11
- 14
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -4,7 +4,6 @@ using MQTTnet.AspNetCore.Client.Tcp;
using MQTTnet.Packets; using MQTTnet.Packets;
using MQTTnet.Serializer; using MQTTnet.Serializer;
using System; using System;
using System.Collections.Generic;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -13,11 +12,6 @@ namespace MQTTnet.AspNetCore
{ {
public class MqttConnectionContext : IMqttChannelAdapter public class MqttConnectionContext : IMqttChannelAdapter
{ {
public IMqttPacketSerializer PacketSerializer { get; }
public ConnectionContext Connection { get; }

public string Endpoint => Connection.ConnectionId;

public MqttConnectionContext( public MqttConnectionContext(
IMqttPacketSerializer packetSerializer, IMqttPacketSerializer packetSerializer,
ConnectionContext connection) ConnectionContext connection)
@@ -26,6 +20,12 @@ namespace MQTTnet.AspNetCore
Connection = connection; Connection = connection;
} }


public string Endpoint => Connection.ConnectionId;
public ConnectionContext Connection { get; }
public IMqttPacketSerializer PacketSerializer { get; }
public event EventHandler ReadingPacketStarted;
public event EventHandler ReadingPacketCompleted;

public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{ {
if (Connection is TcpConnection tcp && !tcp.IsConnected) if (Connection is TcpConnection tcp && !tcp.IsConnected)
@@ -43,10 +43,6 @@ namespace MQTTnet.AspNetCore
return Task.CompletedTask; return Task.CompletedTask;
} }


public void Dispose()
{
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
{ {
var input = Connection.Transport.Input; var input = Connection.Transport.Input;
@@ -106,15 +102,16 @@ namespace MQTTnet.AspNetCore


cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
return null; return null;
}
}


public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{ {
var buffer = PacketSerializer.Serialize(packet); var buffer = PacketSerializer.Serialize(packet);
return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask(); return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask();
} }
public event EventHandler ReadingPacketStarted;
public event EventHandler ReadingPacketCompleted;

public void Dispose()
{
}
} }
} }

+ 2
- 1
Source/MQTTnet.AspnetCore/MqttHostedServer.cs View File

@@ -13,7 +13,8 @@ namespace MQTTnet.AspNetCore
{ {
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;


public MqttHostedServer(IMqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger) : base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer)))
public MqttHostedServer(IMqttServerOptions options, IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
: base(adapters, logger.CreateChildLogger(nameof(MqttHostedServer)))
{ {
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
} }


+ 36
- 39
Source/MQTTnet.AspnetCore/ReaderExtensions.cs View File

@@ -1,6 +1,5 @@
using System; using System;
using System.Buffers; using System.Buffers;
using System.IO;
using MQTTnet.Adapter; using MQTTnet.Adapter;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Packets; using MQTTnet.Packets;
@@ -8,8 +7,43 @@ using MQTTnet.Serializer;


namespace MQTTnet.AspNetCore namespace MQTTnet.AspNetCore
{ {
public static class ReaderExtensions
public static class ReaderExtensions
{ {
public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed)
{
packet = null;
consumed = input.Start;
observed = input.End;
var copy = input;
if (copy.Length < 2)
{
return false;
}

var fixedheader = copy.First.Span[0];
if (!TryReadBodyLength(ref copy, out var bodyLength))
{
return false;
}

var bodySlice = copy.Slice(0, bodyLength);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray(), 0)));
consumed = bodySlice.End;
observed = bodySlice.End;
return true;
}

private static byte[] GetArray(this in ReadOnlySequence<byte> input)
{
if (input.IsSingleSegment)
{
return input.First.Span.ToArray();
}

// Should be rare
return input.ToArray();
}

private static bool TryReadBodyLength(ref ReadOnlySequence<byte> input, out int result) private static bool TryReadBodyLength(ref ReadOnlySequence<byte> input, out int result)
{ {
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html.
@@ -44,42 +78,5 @@ namespace MQTTnet.AspNetCore
result = value; result = value;
return true; return true;
} }



public static byte[] GetArray(this in ReadOnlySequence<byte> input)
{
if (input.IsSingleSegment)
{
return input.First.Span.ToArray();
}

// Should be rare
return input.ToArray();
}

public static bool TryDeserialize(this IMqttPacketSerializer serializer, in ReadOnlySequence<byte> input, out MqttBasePacket packet, out SequencePosition consumed, out SequencePosition observed)
{
packet = null;
consumed = input.Start;
observed = input.End;
var copy = input;
if (copy.Length < 2)
{
return false;
}

var fixedheader = copy.First.Span[0];
if (!TryReadBodyLength(ref copy, out var bodyLength))
{
return false;
}

var bodySlice = copy.Slice(0, bodyLength);
packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray())));
consumed = bodySlice.End;
observed = bodySlice.End;
return true;
}
} }
} }

Loading…
Cancel
Save