Browse Source

Optimized exception handling and awaits.

release/3.x.x
Christian Kratky 7 years ago
parent
commit
eda8bff10b
6 changed files with 70 additions and 71 deletions
  1. +5
    -16
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  2. +5
    -16
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  3. +15
    -16
      Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
  4. +3
    -14
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  5. +37
    -5
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  6. +5
    -4
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 5
- 16
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs View File

@@ -1,6 +1,5 @@
using MQTTnet.Core.Channel; using MQTTnet.Core.Channel;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System; using System;
using System.IO; using System.IO;
using System.Net.WebSockets; using System.Net.WebSockets;
@@ -11,7 +10,7 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket();
private ClientWebSocket _webSocket;


public Stream RawStream { get; private set; } public Stream RawStream { get; private set; }
public Stream SendStream => RawStream; public Stream SendStream => RawStream;
@@ -19,25 +18,15 @@ namespace MQTTnet.Implementations


public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

RawStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
RawStream = new WebSocketStream(_webSocket);
} }


public Task DisconnectAsync() public Task DisconnectAsync()
{ {
RawStream = null; RawStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
return _webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
} }


public void Dispose() public void Dispose()


+ 5
- 16
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -1,6 +1,5 @@
using MQTTnet.Core.Channel; using MQTTnet.Core.Channel;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System; using System;
using System.IO; using System.IO;
using System.Net.WebSockets; using System.Net.WebSockets;
@@ -11,7 +10,7 @@ namespace MQTTnet.Implementations
{ {
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket();
private ClientWebSocket _webSocket;


public Stream SendStream => RawStream; public Stream SendStream => RawStream;
public Stream ReceiveStream => RawStream; public Stream ReceiveStream => RawStream;
@@ -19,25 +18,15 @@ namespace MQTTnet.Implementations


public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

RawStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
RawStream = new WebSocketStream(_webSocket);
} }


public Task DisconnectAsync() public Task DisconnectAsync()
{ {
RawStream = null; RawStream = null;
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
return _webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
} }


public void Dispose() public void Dispose()


+ 15
- 16
Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs View File

@@ -9,12 +9,26 @@ namespace MQTTnet.Implementations
public class WebSocketStream : Stream public class WebSocketStream : Stream
{ {
private readonly ClientWebSocket _webSocket; private readonly ClientWebSocket _webSocket;
public WebSocketStream(ClientWebSocket webSocket) public WebSocketStream(ClientWebSocket webSocket)
{ {
_webSocket = webSocket; _webSocket = webSocket;
} }


public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => true;

public override long Length => throw new NotSupportedException();

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public override void Flush() public override void Flush()
{ {
} }
@@ -52,21 +66,6 @@ namespace MQTTnet.Implementations
WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
} }


public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;

public override long Length
{
get { throw new NotSupportedException(); }
}

public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}

public override long Seek(long offset, SeekOrigin origin) public override long Seek(long offset, SeekOrigin origin)
{ {
throw new NotSupportedException(); throw new NotSupportedException();


+ 3
- 14
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs View File

@@ -1,6 +1,5 @@
using MQTTnet.Core.Channel; using MQTTnet.Core.Channel;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System; using System;
using System.IO; using System.IO;
using System.Net.WebSockets; using System.Net.WebSockets;
@@ -19,19 +18,9 @@ namespace MQTTnet.Implementations


public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);

RawStream = new WebSocketStream(_webSocket);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
RawStream = new WebSocketStream(_webSocket);
} }


public Task DisconnectAsync() public Task DisconnectAsync()


+ 37
- 5
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -26,11 +26,19 @@ namespace MQTTnet.Core.Adapter


public IMqttPacketSerializer PacketSerializer { get; } public IMqttPacketSerializer PacketSerializer { get; }


public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
try try
{ {
return _channel.ConnectAsync(options).TimeoutAfter(timeout);
await _channel.ConnectAsync(options).TimeoutAfter(timeout);
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -38,11 +46,19 @@ namespace MQTTnet.Core.Adapter
} }
} }


public Task DisconnectAsync()
public async Task DisconnectAsync()
{ {
try try
{ {
return _channel.DisconnectAsync();
await _channel.DisconnectAsync();
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -68,6 +84,14 @@ namespace MQTTnet.Core.Adapter
await _sendTask; // configure await false geneates stackoverflow await _sendTask; // configure await false geneates stackoverflow
await _channel.SendStream.FlushAsync().TimeoutAfter(timeout).ConfigureAwait(false); await _channel.SendStream.FlushAsync().TimeoutAfter(timeout).ConfigureAwait(false);
} }
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception) catch (Exception exception)
{ {
throw new MqttCommunicationException(exception); throw new MqttCommunicationException(exception);
@@ -97,13 +121,21 @@ namespace MQTTnet.Core.Adapter
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet);
return packet; return packet;
} }
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception) catch (Exception exception)
{ {
throw new MqttCommunicationException(exception); throw new MqttCommunicationException(exception);
} }
} }


private async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream)
private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream)
{ {
var header = MqttPacketReader.ReadHeaderFromSource(stream); var header = MqttPacketReader.ReadHeaderFromSource(stream);




+ 5
- 4
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -36,13 +36,14 @@ namespace MQTTnet.TestApp.UniversalWindows
var options = new MqttClientOptions var options = new MqttClientOptions
{ {
Server = Server.Text, Server = Server.Text,
Port = 8080,
UserName = User.Text, UserName = User.Text,
Password = Password.Text, Password = Password.Text,
ClientId = ClientId.Text
ClientId = ClientId.Text,
TlsOptions = { UseTls = UseTls.IsChecked == true },
ConnectionType = MqttConnectionType.Ws
}; };

options.TlsOptions.UseTls = UseTls.IsChecked == true;

try try
{ {
if (_mqttClient != null) if (_mqttClient != null)


Loading…
Cancel
Save