From eda8bff10be341578736fc9405617350f7a92813 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 15 Sep 2017 15:58:48 +0200 Subject: [PATCH] Optimized exception handling and awaits. --- .../Implementations/MqttWebSocketChannel.cs | 21 +++------- .../Implementations/MqttWebSocketChannel.cs | 21 +++------- .../Implementations/WebSocketStream.cs | 31 +++++++------- .../Implementations/MqttWebSocketChannel.cs | 17 ++------ .../MqttChannelCommunicationAdapter.cs | 42 ++++++++++++++++--- .../MainPage.xaml.cs | 9 ++-- 6 files changed, 70 insertions(+), 71 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index b639fe0..332180e 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -1,6 +1,5 @@ using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Exceptions; using System; using System.IO; using System.Net.WebSockets; @@ -11,7 +10,7 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { - private ClientWebSocket _webSocket = new ClientWebSocket(); + private ClientWebSocket _webSocket; public Stream RawStream { get; private set; } public Stream SendStream => RawStream; @@ -19,25 +18,15 @@ namespace MQTTnet.Implementations public async Task ConnectAsync(MqttClientOptions options) { - _webSocket = null; - - try - { - _webSocket = new ClientWebSocket(); - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - - RawStream = new WebSocketStream(_webSocket); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } + _webSocket = new ClientWebSocket(); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + RawStream = new WebSocketStream(_webSocket); } public Task DisconnectAsync() { RawStream = null; - return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + return _webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } public void Dispose() diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index e452cda..1ca501a 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -1,6 +1,5 @@ using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Exceptions; using System; using System.IO; using System.Net.WebSockets; @@ -11,7 +10,7 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { - private ClientWebSocket _webSocket = new ClientWebSocket(); + private ClientWebSocket _webSocket; public Stream SendStream => RawStream; public Stream ReceiveStream => RawStream; @@ -19,25 +18,15 @@ namespace MQTTnet.Implementations public async Task ConnectAsync(MqttClientOptions options) { - _webSocket = null; - - try - { - _webSocket = new ClientWebSocket(); - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - - RawStream = new WebSocketStream(_webSocket); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } + _webSocket = new ClientWebSocket(); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + RawStream = new WebSocketStream(_webSocket); } public Task DisconnectAsync() { RawStream = null; - return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + return _webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } public void Dispose() diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index d912148..6efe750 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -9,12 +9,26 @@ namespace MQTTnet.Implementations public class WebSocketStream : Stream { private readonly ClientWebSocket _webSocket; - + public WebSocketStream(ClientWebSocket webSocket) { _webSocket = webSocket; } + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + public override void Flush() { } @@ -52,21 +66,6 @@ namespace MQTTnet.Implementations WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override bool CanRead => true; - public override bool CanSeek => false; - public override bool CanWrite => true; - - public override long Length - { - get { throw new NotSupportedException(); } - } - - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs index b639fe0..d8572ad 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -1,6 +1,5 @@ using MQTTnet.Core.Channel; using MQTTnet.Core.Client; -using MQTTnet.Core.Exceptions; using System; using System.IO; using System.Net.WebSockets; @@ -19,19 +18,9 @@ namespace MQTTnet.Implementations public async Task ConnectAsync(MqttClientOptions options) { - _webSocket = null; - - try - { - _webSocket = new ClientWebSocket(); - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - - RawStream = new WebSocketStream(_webSocket); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } + _webSocket = new ClientWebSocket(); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + RawStream = new WebSocketStream(_webSocket); } public Task DisconnectAsync() diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index a0239d5..9f99349 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -26,11 +26,19 @@ namespace MQTTnet.Core.Adapter public IMqttPacketSerializer PacketSerializer { get; } - public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) + public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { try { - return _channel.ConnectAsync(options).TimeoutAfter(timeout); + await _channel.ConnectAsync(options).TimeoutAfter(timeout); + } + catch (MqttCommunicationTimedOutException) + { + throw; + } + catch (MqttCommunicationException) + { + throw; } catch (Exception exception) { @@ -38,11 +46,19 @@ namespace MQTTnet.Core.Adapter } } - public Task DisconnectAsync() + public async Task DisconnectAsync() { try { - return _channel.DisconnectAsync(); + await _channel.DisconnectAsync(); + } + catch (MqttCommunicationTimedOutException) + { + throw; + } + catch (MqttCommunicationException) + { + throw; } catch (Exception exception) { @@ -68,6 +84,14 @@ namespace MQTTnet.Core.Adapter await _sendTask; // configure await false geneates stackoverflow await _channel.SendStream.FlushAsync().TimeoutAfter(timeout).ConfigureAwait(false); } + catch (MqttCommunicationTimedOutException) + { + throw; + } + catch (MqttCommunicationException) + { + throw; + } catch (Exception exception) { throw new MqttCommunicationException(exception); @@ -97,13 +121,21 @@ namespace MQTTnet.Core.Adapter MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); return packet; } + catch (MqttCommunicationTimedOutException) + { + throw; + } + catch (MqttCommunicationException) + { + throw; + } catch (Exception exception) { throw new MqttCommunicationException(exception); } } - private async Task ReceiveAsync(Stream stream) + private static async Task ReceiveAsync(Stream stream) { var header = MqttPacketReader.ReadHeaderFromSource(stream); diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index aa40c8b..4b25948 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -36,13 +36,14 @@ namespace MQTTnet.TestApp.UniversalWindows var options = new MqttClientOptions { Server = Server.Text, + Port = 8080, UserName = User.Text, Password = Password.Text, - ClientId = ClientId.Text + ClientId = ClientId.Text, + TlsOptions = { UseTls = UseTls.IsChecked == true }, + ConnectionType = MqttConnectionType.Ws }; - - options.TlsOptions.UseTls = UseTls.IsChecked == true; - + try { if (_mqttClient != null)