Преглед изворни кода

Refactoring of WebSocket support

release/3.x.x
Christian Kratky пре 7 година
родитељ
комит
1c30d14910
13 измењених фајлова са 282 додато и 38 уклоњено
  1. +2
    -1
      Build/MQTTnet.nuspec
  2. +4
    -13
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  3. +1
    -0
      Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
  4. +18
    -1
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  5. +110
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  6. +6
    -6
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  7. +110
    -0
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  8. +1
    -0
      Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
  9. +18
    -1
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  10. +0
    -14
      MQTTnet.Core/Client/ConnectionType.cs
  11. +1
    -1
      MQTTnet.Core/Client/MqttClientOptions.cs
  12. +10
    -0
      MQTTnet.Core/Client/MqttConnectionType.cs
  13. +1
    -1
      MQTTnet.TestApp.NetCore/Program.cs

+ 2
- 1
Build/MQTTnet.nuspec Прегледај датотеку

@@ -10,7 +10,8 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>*
<releaseNotes>* [Client] Added support for web socket communication channel (thanks to nowakpiotr)
* [Core] Performance optimizations (thanks to JanEggers)
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino</tags>


Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs → Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs Прегледај датотеку

@@ -8,20 +8,15 @@ using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket;
private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;

public MqttWebSocketsChannel()
{
_webSocket = new ClientWebSocket();
}

public async Task ConnectAsync(MqttClientOptions options)
{
_webSocket = null;
@@ -45,10 +40,7 @@ namespace MQTTnet.Implementations

public void Dispose()
{
if (_webSocket != null)
{
_webSocket.Dispose();
}
_webSocket?.Dispose();
}

public Task ReadAsync(byte[] buffer)
@@ -70,8 +62,7 @@ namespace MQTTnet.Implementations
WebSocketReceiveResult response;
do
{
response =
await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);

temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;

+ 1
- 0
Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj Прегледај датотеку

@@ -100,6 +100,7 @@
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="Implementations\MqttWebSocketChannel.cs" />
<Compile Include="MqttClientFactory.cs" />
<Compile Include="MqttServerFactory.cs" />
<Compile Include="Implementations\MqttServerAdapter.cs" />


+ 18
- 1
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs Прегледај датотеку

@@ -1,5 +1,6 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
@@ -12,7 +13,23 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case MqttConnectionType.Tcp:
case MqttConnectionType.Tls:
return new MqttTcpChannel();
case MqttConnectionType.Ws:
case MqttConnectionType.Wss:
return new MqttWebSocketChannel();

default:
throw new NotSupportedException();
}
}
}
}

+ 110
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs Прегледај датотеку

@@ -0,0 +1,110 @@
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;

public async Task ConnectAsync(MqttClientOptions options)
{
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task DisconnectAsync()
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

public void Dispose()
{
_webSocket?.Dispose();
}

public Task ReadAsync(byte[] buffer)
{
return Task.WhenAll(ReadToBufferAsync(buffer));
}

private async Task ReadToBufferAsync(byte[] buffer)
{
var temporaryBuffer = new byte[BufferSize];
var offset = 0;

while (_webSocket.State == WebSocketState.Open)
{
if (WebSocketBufferSize == 0)
{
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);

temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);

WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
{
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}

return;
}
}

public Task WriteAsync(byte[] buffer)
{
if (buffer == null) {
throw new ArgumentNullException(nameof(buffer));
}

try
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}
}
}

+ 6
- 6
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs Прегледај датотеку

@@ -22,15 +22,15 @@ namespace MQTTnet
{
switch (options.ConnectionType)
{
case ConnectionTypes.TCP:
case ConnectionTypes.TLS:
case MqttConnectionType.Tcp:
case MqttConnectionType.Tls:
return new MqttTcpChannel();
case ConnectionTypes.WS:
case ConnectionTypes.WSS:
return new MqttWebSocketsChannel();
case MqttConnectionType.Ws:
case MqttConnectionType.Wss:
return new MqttWebSocketChannel();

default:
return null;
throw new NotSupportedException();
}
}
}


+ 110
- 0
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs Прегледај датотеку

@@ -0,0 +1,110 @@
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;

public async Task ConnectAsync(MqttClientOptions options)
{
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task DisconnectAsync()
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

public void Dispose()
{
_webSocket?.Dispose();
}

public Task ReadAsync(byte[] buffer)
{
return Task.WhenAll(ReadToBufferAsync(buffer));
}

private async Task ReadToBufferAsync(byte[] buffer)
{
var temporaryBuffer = new byte[BufferSize];
var offset = 0;

while (_webSocket.State == WebSocketState.Open)
{
if (WebSocketBufferSize == 0)
{
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);

temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);

WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
{
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}

return;
}
}

public Task WriteAsync(byte[] buffer)
{
if (buffer == null) {
throw new ArgumentNullException(nameof(buffer));
}

try
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}
}
}

+ 1
- 0
Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj Прегледај датотеку

@@ -110,6 +110,7 @@
<DocumentationFile>bin\x64\Release\MQTTnet.XML</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Compile Include="Implementations\MqttWebSocketChannel.cs" />
<Compile Include="MqttClientFactory.cs" />
<Compile Include="Implementations\MqttServerAdapter.cs" />
<Compile Include="MqttServerFactory.cs" />


+ 18
- 1
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs Прегледај датотеку

@@ -1,5 +1,6 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
@@ -12,7 +13,23 @@ namespace MQTTnet
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case MqttConnectionType.Tcp:
case MqttConnectionType.Tls:
return new MqttTcpChannel();
case MqttConnectionType.Ws:
case MqttConnectionType.Wss:
return new MqttWebSocketChannel();

default:
throw new NotSupportedException();
}
}
}
}

+ 0
- 14
MQTTnet.Core/Client/ConnectionType.cs Прегледај датотеку

@@ -1,14 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace MQTTnet.Core.Client
{
public enum ConnectionTypes
{
TCP,
TLS,
WS,
WSS
}
}

+ 1
- 1
MQTTnet.Core/Client/MqttClientOptions.cs Прегледај датотеку

@@ -25,6 +25,6 @@ namespace MQTTnet.Core.Client

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP;
public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp;
}
}

+ 10
- 0
MQTTnet.Core/Client/MqttConnectionType.cs Прегледај датотеку

@@ -0,0 +1,10 @@
namespace MQTTnet.Core.Client
{
public enum MqttConnectionType
{
Tcp,
Tls,
Ws,
Wss
}
}

+ 1
- 1
MQTTnet.TestApp.NetCore/Program.cs Прегледај датотеку

@@ -51,7 +51,7 @@ namespace MQTTnet.TestApp.NetCore
Server = "localhost",
ClientId = "XYZ",
CleanSession = true,
ConnectionType = ConnectionTypes.WS
ConnectionType = MqttConnectionType.Ws
};

var client = new MqttClientFactory().CreateMqttClient(options);


Loading…
Откажи
Сачувај