diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index a1ee8c5..9c3679a 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -13,13 +13,15 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private Stream _receiveStream; + private Stream _rawStream; private Stream _sendStream; + private Stream _receiveStream; private Socket _socket; private SslStream _sslStream; - public Stream ReceiveStream => _receiveStream; + public Stream RawStream => _rawStream; public Stream SendStream => _sendStream; + public Stream ReceiveStream => _receiveStream; /// /// called on client sockets are created in connect @@ -92,8 +94,9 @@ namespace MQTTnet.Implementations private void CreateCommStreams( Socket socket, SslStream sslStream ) { //cannot use this as default buffering prevents from receiving the first connect message - _receiveStream = (Stream)sslStream ?? new NetworkStream( socket ); - _sendStream = new BufferedStream( _receiveStream, BufferConstants.Size ); + _rawStream = (Stream)sslStream ?? new NetworkStream( socket ); + _sendStream = new BufferedStream( _rawStream, BufferConstants.Size ); + _receiveStream = new BufferedStream( _rawStream, BufferConstants.Size ); } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 02cc32e..bc224cb 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -12,10 +12,11 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream ReceiveStream { get; private set; } - public Stream SendStream { get; private set; } + public Stream RawStream { get; private set; } + + public Stream SendStream => RawStream; + public Stream ReceiveStream => RawStream; public async Task ConnectAsync(MqttClientOptions options) { @@ -26,7 +27,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - ReceiveStream = SendStream = new WebSocketStream(_webSocket); + RawStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -36,7 +37,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - ReceiveStream = null; + RawStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index b8916e0..536ebf6 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -19,6 +19,7 @@ namespace MQTTnet.Implementations public Stream ReceiveStream => _dataStream; + public Stream RawStream => _dataStream; public Stream SendStream => _dataStream; /// diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index abce128..e452cda 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -12,9 +12,10 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream SendStream { get; private set; } - public Stream ReceiveStream { get; private set; } + + public Stream SendStream => RawStream; + public Stream ReceiveStream => RawStream; + public Stream RawStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -25,7 +26,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - SendStream = ReceiveStream = new WebSocketStream(_webSocket); + RawStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -35,7 +36,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - SendStream = ReceiveStream = null; + RawStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index da0d442..09875e2 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -50,7 +50,7 @@ namespace MQTTnet.Core.Adapter } } - await _sendTask.ConfigureAwait( false ); + await _sendTask; // configure await false geneates stackoverflow await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false ); } @@ -61,11 +61,11 @@ namespace MQTTnet.Core.Adapter Tuple tuple; if (timeout > TimeSpan.Zero) { - tuple = await ReceiveAsync().TimeoutAfter(timeout).ConfigureAwait(false); + tuple = await ReceiveAsync(_channel.RawStream).TimeoutAfter(timeout).ConfigureAwait(false); } else { - tuple = await ReceiveAsync().ConfigureAwait(false); + tuple = await ReceiveAsync(_channel.RawStream).ConfigureAwait(false); } var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2); @@ -79,9 +79,8 @@ namespace MQTTnet.Core.Adapter return packet; } - private async Task> ReceiveAsync() + private async Task> ReceiveAsync(Stream stream) { - var stream = _channel.ReceiveStream; var header = MqttPacketReader.ReadHeaderFromSource(stream); MemoryStream body = null; diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 6b0854d..80c1308 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -13,5 +13,7 @@ namespace MQTTnet.Core.Channel Stream SendStream { get; } Stream ReceiveStream { get; } + + Stream RawStream { get; } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 14da310..e0ae8fa 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -393,6 +393,8 @@ namespace MQTTnet.Core.Tests public Stream ReceiveStream => _stream; + public Stream RawStream => _stream; + public Stream SendStream => _stream; public bool IsConnected { get; } = true; diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 50eb8a7..e3e9028 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; @@ -46,6 +47,11 @@ namespace MQTTnet.Core.Tests return Task.Run(() => _incomingPackets.Take()); } + public IEnumerable ReceivePackets( CancellationToken cancellationToken ) + { + return _incomingPackets.GetConsumingEnumerable(); + } + private void SendPacketInternal(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index fde64d9..049e9c9 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetFramework public static async Task RunAsync() { var server = Task.Run(() => RunServerAsync()); - var client = Task.Run(() => RunClientAsync(500, TimeSpan.FromMilliseconds(10))); + var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10))); await Task.WhenAll(server, client).ConfigureAwait(false); }