diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 6da1a6f..78c44f9 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -10,7 +10,8 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
- *
+ * [Client] Added support for web socket communication channel (thanks to nowakpiotr)
+* [Core] Performance optimizations (thanks to JanEggers)
Copyright Christian Kratky 2016-2017
MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
similarity index 87%
rename from Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs
rename to Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
index 8d51812..29fcae9 100644
--- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs
+++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
@@ -8,20 +8,15 @@ using System.Threading.Tasks;
namespace MQTTnet.Implementations
{
- public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable
+ public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
- private ClientWebSocket _webSocket;
+ private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;
- public MqttWebSocketsChannel()
- {
- _webSocket = new ClientWebSocket();
- }
-
public async Task ConnectAsync(MqttClientOptions options)
{
_webSocket = null;
@@ -45,10 +40,7 @@ namespace MQTTnet.Implementations
public void Dispose()
{
- if (_webSocket != null)
- {
- _webSocket.Dispose();
- }
+ _webSocket?.Dispose();
}
public Task ReadAsync(byte[] buffer)
@@ -70,8 +62,7 @@ namespace MQTTnet.Implementations
WebSocketReceiveResult response;
do
{
- response =
- await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None);
+ response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None);
temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
index 7f4ab44..7578afe 100644
--- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
+++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
@@ -100,6 +100,7 @@
+
diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
index 7dd7500..8075f92 100644
--- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
@@ -1,5 +1,6 @@
using System;
using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
@@ -12,7 +13,23 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));
- return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
+ return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
+ }
+
+ private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
+ {
+ switch (options.ConnectionType)
+ {
+ case MqttConnectionType.Tcp:
+ case MqttConnectionType.Tls:
+ return new MqttTcpChannel();
+ case MqttConnectionType.Ws:
+ case MqttConnectionType.Wss:
+ return new MqttWebSocketChannel();
+
+ default:
+ throw new NotSupportedException();
+ }
}
}
}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
new file mode 100644
index 0000000..29fcae9
--- /dev/null
+++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
@@ -0,0 +1,110 @@
+using MQTTnet.Core.Channel;
+using MQTTnet.Core.Client;
+using MQTTnet.Core.Exceptions;
+using System;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Implementations
+{
+ public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
+ {
+ private ClientWebSocket _webSocket = new ClientWebSocket();
+ private const int BufferSize = 4096;
+ private const int BufferAmplifier = 20;
+ private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
+ private int WebSocketBufferSize;
+ private int WebSocketBufferOffset;
+
+ public async Task ConnectAsync(MqttClientOptions options)
+ {
+ _webSocket = null;
+
+ try
+ {
+ _webSocket = new ClientWebSocket();
+
+ await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
+ }
+ catch (WebSocketException exception)
+ {
+ throw new MqttCommunicationException(exception);
+ }
+ }
+
+ public async Task DisconnectAsync()
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
+ }
+
+ public void Dispose()
+ {
+ _webSocket?.Dispose();
+ }
+
+ public Task ReadAsync(byte[] buffer)
+ {
+ return Task.WhenAll(ReadToBufferAsync(buffer));
+ }
+
+ private async Task ReadToBufferAsync(byte[] buffer)
+ {
+ var temporaryBuffer = new byte[BufferSize];
+ var offset = 0;
+
+ while (_webSocket.State == WebSocketState.Open)
+ {
+ if (WebSocketBufferSize == 0)
+ {
+ WebSocketBufferOffset = 0;
+
+ WebSocketReceiveResult response;
+ do
+ {
+ response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None);
+
+ temporaryBuffer.CopyTo(WebSocketBuffer, offset);
+ offset += response.Count;
+ temporaryBuffer = new byte[BufferSize];
+ } while (!response.EndOfMessage);
+
+ WebSocketBufferSize = response.Count;
+ if (response.MessageType == WebSocketMessageType.Close)
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
+ }
+
+ Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
+ WebSocketBufferSize -= buffer.Length;
+ WebSocketBufferOffset += buffer.Length;
+ }
+ else
+ {
+ Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
+ WebSocketBufferSize -= buffer.Length;
+ WebSocketBufferOffset += buffer.Length;
+ }
+
+ return;
+ }
+ }
+
+ public Task WriteAsync(byte[] buffer)
+ {
+ if (buffer == null) {
+ throw new ArgumentNullException(nameof(buffer));
+ }
+
+ try
+ {
+ return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true,
+ CancellationToken.None);
+ }
+ catch (WebSocketException exception)
+ {
+ throw new MqttCommunicationException(exception);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
index 1cd6d6f..5857075 100644
--- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
@@ -22,15 +22,15 @@ namespace MQTTnet
{
switch (options.ConnectionType)
{
- case ConnectionTypes.TCP:
- case ConnectionTypes.TLS:
+ case MqttConnectionType.Tcp:
+ case MqttConnectionType.Tls:
return new MqttTcpChannel();
- case ConnectionTypes.WS:
- case ConnectionTypes.WSS:
- return new MqttWebSocketsChannel();
+ case MqttConnectionType.Ws:
+ case MqttConnectionType.Wss:
+ return new MqttWebSocketChannel();
default:
- return null;
+ throw new NotSupportedException();
}
}
}
diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
new file mode 100644
index 0000000..29fcae9
--- /dev/null
+++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
@@ -0,0 +1,110 @@
+using MQTTnet.Core.Channel;
+using MQTTnet.Core.Client;
+using MQTTnet.Core.Exceptions;
+using System;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Implementations
+{
+ public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
+ {
+ private ClientWebSocket _webSocket = new ClientWebSocket();
+ private const int BufferSize = 4096;
+ private const int BufferAmplifier = 20;
+ private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
+ private int WebSocketBufferSize;
+ private int WebSocketBufferOffset;
+
+ public async Task ConnectAsync(MqttClientOptions options)
+ {
+ _webSocket = null;
+
+ try
+ {
+ _webSocket = new ClientWebSocket();
+
+ await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
+ }
+ catch (WebSocketException exception)
+ {
+ throw new MqttCommunicationException(exception);
+ }
+ }
+
+ public async Task DisconnectAsync()
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
+ }
+
+ public void Dispose()
+ {
+ _webSocket?.Dispose();
+ }
+
+ public Task ReadAsync(byte[] buffer)
+ {
+ return Task.WhenAll(ReadToBufferAsync(buffer));
+ }
+
+ private async Task ReadToBufferAsync(byte[] buffer)
+ {
+ var temporaryBuffer = new byte[BufferSize];
+ var offset = 0;
+
+ while (_webSocket.State == WebSocketState.Open)
+ {
+ if (WebSocketBufferSize == 0)
+ {
+ WebSocketBufferOffset = 0;
+
+ WebSocketReceiveResult response;
+ do
+ {
+ response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None);
+
+ temporaryBuffer.CopyTo(WebSocketBuffer, offset);
+ offset += response.Count;
+ temporaryBuffer = new byte[BufferSize];
+ } while (!response.EndOfMessage);
+
+ WebSocketBufferSize = response.Count;
+ if (response.MessageType == WebSocketMessageType.Close)
+ {
+ await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
+ }
+
+ Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
+ WebSocketBufferSize -= buffer.Length;
+ WebSocketBufferOffset += buffer.Length;
+ }
+ else
+ {
+ Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
+ WebSocketBufferSize -= buffer.Length;
+ WebSocketBufferOffset += buffer.Length;
+ }
+
+ return;
+ }
+ }
+
+ public Task WriteAsync(byte[] buffer)
+ {
+ if (buffer == null) {
+ throw new ArgumentNullException(nameof(buffer));
+ }
+
+ try
+ {
+ return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true,
+ CancellationToken.None);
+ }
+ catch (WebSocketException exception)
+ {
+ throw new MqttCommunicationException(exception);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
index 791ee28..3dd8a2d 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
+++ b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
@@ -110,6 +110,7 @@
bin\x64\Release\MQTTnet.XML
+
diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
index 7dd7500..8075f92 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
@@ -1,5 +1,6 @@
using System;
using MQTTnet.Core.Adapter;
+using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
@@ -12,7 +13,23 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));
- return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
+ return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
+ }
+
+ private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
+ {
+ switch (options.ConnectionType)
+ {
+ case MqttConnectionType.Tcp:
+ case MqttConnectionType.Tls:
+ return new MqttTcpChannel();
+ case MqttConnectionType.Ws:
+ case MqttConnectionType.Wss:
+ return new MqttWebSocketChannel();
+
+ default:
+ throw new NotSupportedException();
+ }
}
}
}
\ No newline at end of file
diff --git a/MQTTnet.Core/Client/ConnectionType.cs b/MQTTnet.Core/Client/ConnectionType.cs
deleted file mode 100644
index 1c6abad..0000000
--- a/MQTTnet.Core/Client/ConnectionType.cs
+++ /dev/null
@@ -1,14 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace MQTTnet.Core.Client
-{
- public enum ConnectionTypes
- {
- TCP,
- TLS,
- WS,
- WSS
- }
-}
diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs
index 8b65299..12e7ae9 100644
--- a/MQTTnet.Core/Client/MqttClientOptions.cs
+++ b/MQTTnet.Core/Client/MqttClientOptions.cs
@@ -25,6 +25,6 @@ namespace MQTTnet.Core.Client
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
- public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP;
+ public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp;
}
}
diff --git a/MQTTnet.Core/Client/MqttConnectionType.cs b/MQTTnet.Core/Client/MqttConnectionType.cs
new file mode 100644
index 0000000..636054d
--- /dev/null
+++ b/MQTTnet.Core/Client/MqttConnectionType.cs
@@ -0,0 +1,10 @@
+namespace MQTTnet.Core.Client
+{
+ public enum MqttConnectionType
+ {
+ Tcp,
+ Tls,
+ Ws,
+ Wss
+ }
+}
diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs
index 4644471..e5470d9 100644
--- a/MQTTnet.TestApp.NetCore/Program.cs
+++ b/MQTTnet.TestApp.NetCore/Program.cs
@@ -51,7 +51,7 @@ namespace MQTTnet.TestApp.NetCore
Server = "localhost",
ClientId = "XYZ",
CleanSession = true,
- ConnectionType = ConnectionTypes.WS
+ ConnectionType = MqttConnectionType.Ws
};
var client = new MqttClientFactory().CreateMqttClient(options);