|
|
@@ -18,6 +18,8 @@ namespace MQTTnet.AspNetCore |
|
|
|
PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter)); |
|
|
|
Connection = connection ?? throw new ArgumentNullException(nameof(connection)); |
|
|
|
} |
|
|
|
private PipeReader _input; |
|
|
|
private PipeWriter _output; |
|
|
|
|
|
|
|
public string Endpoint => Connection.ConnectionId; |
|
|
|
public bool IsSecureConnection => false; // TODO: Fix detection (WS vs. WSS). |
|
|
@@ -33,20 +35,21 @@ namespace MQTTnet.AspNetCore |
|
|
|
|
|
|
|
private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); |
|
|
|
|
|
|
|
public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) |
|
|
|
public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
if (Connection is TcpConnection tcp && !tcp.IsConnected) |
|
|
|
{ |
|
|
|
return tcp.StartAsync(); |
|
|
|
await tcp.StartAsync().ConfigureAwait(false); |
|
|
|
} |
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
|
_input = Connection.Transport.Input; |
|
|
|
_output = Connection.Transport.Output; |
|
|
|
} |
|
|
|
|
|
|
|
public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
Connection.Transport.Input.Complete(); |
|
|
|
Connection.Transport.Output.Complete(); |
|
|
|
_input?.Complete(); |
|
|
|
_output?.Complete(); |
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
|
} |
|
|
@@ -54,7 +57,6 @@ namespace MQTTnet.AspNetCore |
|
|
|
public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
var input = Connection.Transport.Input; |
|
|
|
var reader = new SpanBasedMqttPacketBodyReader(); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|