From 60bdca8716aeb5c190dfcb3abea6a6bff4b76ba9 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 15 Oct 2017 10:16:26 +0200 Subject: [PATCH] added back buffering for .net and annotated ifdefs with minimum dependency tor remove them --- .../Implementations/MqttServerAdapter.cs | 1 + .../Implementations/MqttTcpChannel.cs | 42 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index a140ca0..aa38653 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -88,6 +88,7 @@ namespace MQTTnet.Implementations { try { +//todo: else branch can be used with min dependency NET46 #if NET45 var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); #else diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index d94ddb4..dfe3654 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -15,6 +15,13 @@ namespace MQTTnet.Implementations { private readonly MqttClientTcpOptions _options; +//todo: this can be used with min dependency NetStandard1.6 +#if NET45 + // ReSharper disable once MemberCanBePrivate.Global + // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global + public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. +#endif + private Socket _socket; private SslStream _sslStream; @@ -34,12 +41,14 @@ namespace MQTTnet.Implementations { _socket = socket ?? throw new ArgumentNullException(nameof(socket)); _sslStream = sslStream; - ReceiveStream = (Stream)sslStream ?? new NetworkStream(socket); + + CreateStreams(socket, sslStream); } - public Stream SendStream => ReceiveStream; + public Stream SendStream { get; private set; } public Stream ReceiveStream { get; private set; } - public Stream RawReceiveStream => ReceiveStream; + public Stream RawReceiveStream { get; private set; } + public static Func CustomCertificateValidationCallback { get; set; } @@ -50,6 +59,7 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } +//todo: else brach can be used with min dependency NET46 #if NET45 await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false); #else @@ -59,13 +69,10 @@ namespace MQTTnet.Implementations if (_options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback); - ReceiveStream = _sslStream; await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false); } - else - { - ReceiveStream = new NetworkStream(_socket); - } + + CreateStreams(_socket, _sslStream); } public Task DisconnectAsync() @@ -129,5 +136,24 @@ namespace MQTTnet.Implementations return certificates; } + + private void CreateStreams(Socket socket, Stream sslStream) + { + RawReceiveStream = sslStream ?? new NetworkStream(socket); + + + //cannot use this as default buffering prevents from receiving the first connect message + //need two streams otherwise read and write have to be synchronized + +//todo: if branch can be used with min dependency NetStandard1.6 +#if NET45 + SendStream = new BufferedStream(RawReceiveStream, BufferSize); + ReceiveStream = new BufferedStream(RawReceiveStream, BufferSize); +#else + SendStream = RawReceiveStream; + ReceiveStream = RawReceiveStream; +#endif + } + } } \ No newline at end of file