Explorar el Código

use raw buffer for frist connect msg then use buffered stream to read more efficient

release/3.x.x
Eggers Jan hace 7 años
padre
commit
c7c1691d75
Se han modificado 9 ficheros con 35 adiciones y 20 borrados
  1. +7
    -4
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  2. +6
    -5
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  3. +1
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +6
    -5
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  5. +4
    -5
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  6. +2
    -0
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  7. +2
    -0
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  8. +6
    -0
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs
  9. +1
    -1
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs

+ 7
- 4
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs Ver fichero

@@ -13,13 +13,15 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private Stream _receiveStream;
private Stream _rawStream;
private Stream _sendStream;
private Stream _receiveStream;
private Socket _socket;
private SslStream _sslStream;

public Stream ReceiveStream => _receiveStream;
public Stream RawStream => _rawStream;
public Stream SendStream => _sendStream;
public Stream ReceiveStream => _receiveStream;

/// <summary>
/// called on client sockets are created in connect
@@ -92,8 +94,9 @@ namespace MQTTnet.Implementations
private void CreateCommStreams( Socket socket, SslStream sslStream )
{
//cannot use this as default buffering prevents from receiving the first connect message
_receiveStream = (Stream)sslStream ?? new NetworkStream( socket );
_sendStream = new BufferedStream( _receiveStream, BufferConstants.Size );
_rawStream = (Stream)sslStream ?? new NetworkStream( socket );
_sendStream = new BufferedStream( _rawStream, BufferConstants.Size );
_receiveStream = new BufferedStream( _rawStream, BufferConstants.Size );
}

private static X509CertificateCollection LoadCertificates(MqttClientOptions options)


+ 6
- 5
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs Ver fichero

@@ -12,10 +12,11 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
public Stream ReceiveStream { get; private set; }

public Stream SendStream { get; private set; }
public Stream RawStream { get; private set; }

public Stream SendStream => RawStream;
public Stream ReceiveStream => RawStream;

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -26,7 +27,7 @@ namespace MQTTnet.Implementations
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

ReceiveStream = SendStream = new WebSocketStream(_webSocket);
RawStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -36,7 +37,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
ReceiveStream = null;
RawStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}



+ 1
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs Ver fichero

@@ -19,6 +19,7 @@ namespace MQTTnet.Implementations


public Stream ReceiveStream => _dataStream;
public Stream RawStream => _dataStream;
public Stream SendStream => _dataStream;

/// <summary>


+ 6
- 5
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs Ver fichero

@@ -12,9 +12,10 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
public Stream SendStream { get; private set; }
public Stream ReceiveStream { get; private set; }

public Stream SendStream => RawStream;
public Stream ReceiveStream => RawStream;
public Stream RawStream { get; private set; }

public async Task ConnectAsync(MqttClientOptions options)
{
@@ -25,7 +26,7 @@ namespace MQTTnet.Implementations
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

SendStream = ReceiveStream = new WebSocketStream(_webSocket);
RawStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
@@ -35,7 +36,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync()
{
SendStream = ReceiveStream = null;
RawStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}



+ 4
- 5
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs Ver fichero

@@ -50,7 +50,7 @@ namespace MQTTnet.Core.Adapter
}
}

await _sendTask.ConfigureAwait( false );
await _sendTask; // configure await false geneates stackoverflow
await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false );
}

@@ -61,11 +61,11 @@ namespace MQTTnet.Core.Adapter
Tuple<MqttPacketHeader, MemoryStream> tuple;
if (timeout > TimeSpan.Zero)
{
tuple = await ReceiveAsync().TimeoutAfter(timeout).ConfigureAwait(false);
tuple = await ReceiveAsync(_channel.RawStream).TimeoutAfter(timeout).ConfigureAwait(false);
}
else
{
tuple = await ReceiveAsync().ConfigureAwait(false);
tuple = await ReceiveAsync(_channel.RawStream).ConfigureAwait(false);
}

var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2);
@@ -79,9 +79,8 @@ namespace MQTTnet.Core.Adapter
return packet;
}

private async Task<Tuple<MqttPacketHeader, MemoryStream>> ReceiveAsync()
private async Task<Tuple<MqttPacketHeader, MemoryStream>> ReceiveAsync(Stream stream)
{
var stream = _channel.ReceiveStream;
var header = MqttPacketReader.ReadHeaderFromSource(stream);

MemoryStream body = null;


+ 2
- 0
MQTTnet.Core/Channel/IMqttCommunicationChannel.cs Ver fichero

@@ -13,5 +13,7 @@ namespace MQTTnet.Core.Channel
Stream SendStream { get; }

Stream ReceiveStream { get; }

Stream RawStream { get; }
}
}

+ 2
- 0
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs Ver fichero

@@ -393,6 +393,8 @@ namespace MQTTnet.Core.Tests

public Stream ReceiveStream => _stream;

public Stream RawStream => _stream;

public Stream SendStream => _stream;

public bool IsConnected { get; } = true;


+ 6
- 0
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs Ver fichero

@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
@@ -46,6 +47,11 @@ namespace MQTTnet.Core.Tests
return Task.Run(() => _incomingPackets.Take());
}

public IEnumerable<MqttBasePacket> ReceivePackets( CancellationToken cancellationToken )
{
return _incomingPackets.GetConsumingEnumerable();
}

private void SendPacketInternal(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));


+ 1
- 1
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs Ver fichero

@@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetFramework
public static async Task RunAsync()
{
var server = Task.Run(() => RunServerAsync());
var client = Task.Run(() => RunClientAsync(500, TimeSpan.FromMilliseconds(10)));
var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10)));

await Task.WhenAll(server, client).ConfigureAwait(false);
}


Cargando…
Cancelar
Guardar