Browse Source

Merge pull request #29 from Inventum24/WebSockets

Support for websockets
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
960900dafa
10 changed files with 364 additions and 4 deletions
  1. +119
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs
  2. +2
    -0
      Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
  3. +21
    -2
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  4. +14
    -0
      MQTTnet.Core/Client/ConnectionType.cs
  5. +2
    -0
      MQTTnet.Core/Client/MqttClientOptions.cs
  6. +13
    -0
      MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
  7. +171
    -0
      MQTTnet.TestApp.NetCore/Program.cs
  8. +20
    -1
      MQTTnet.sln
  9. +1
    -1
      Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj
  10. +1
    -0
      Tests/MQTTnet.TestApp.NetFramework/Program.cs

+ 119
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs View File

@@ -0,0 +1,119 @@
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable
{
private ClientWebSocket _webSocket;
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize;
private int WebSocketBufferOffset;

public MqttWebSocketsChannel()
{
_webSocket = new ClientWebSocket();
}

public async Task ConnectAsync(MqttClientOptions options)
{
_webSocket = null;

try
{
_webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task DisconnectAsync()
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

public void Dispose()
{
if (_webSocket != null)
{
_webSocket.Dispose();
}
}

public Task ReadAsync(byte[] buffer)
{
return Task.WhenAll(ReadToBufferAsync(buffer));
}

private async Task ReadToBufferAsync(byte[] buffer)
{
var temporaryBuffer = new byte[BufferSize];
var offset = 0;

while (_webSocket.State == WebSocketState.Open)
{
if (WebSocketBufferSize == 0)
{
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response =
await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);

temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);

WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
{
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}

return;
}
}

public Task WriteAsync(byte[] buffer)
{
if (buffer == null) {
throw new ArgumentNullException(nameof(buffer));
}

try
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
}
catch (WebSocketException exception)
{
throw new MqttCommunicationException(exception);
}
}
}
}

+ 2
- 0
Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj View File

@@ -24,6 +24,8 @@

<ItemGroup>
<PackageReference Include="System.Net.Security" Version="4.3.1" />
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" />
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>


+ 21
- 2
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs View File

@@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;
using MQTTnet.Implementations;
using MQTTnet.Core.Channel;

namespace MQTTnet
{
@@ -10,9 +11,27 @@ namespace MQTTnet
{
public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (options == null) {
throw new ArgumentNullException(nameof(options));
}

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer()));
}

private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options)
{
switch (options.ConnectionType)
{
case ConnectionTypes.TCP:
case ConnectionTypes.TLS:
return new MqttTcpChannel();
case ConnectionTypes.WS:
case ConnectionTypes.WSS:
return new MqttWebSocketsChannel();

default:
return null;
}
}
}
}

+ 14
- 0
MQTTnet.Core/Client/ConnectionType.cs View File

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace MQTTnet.Core.Client
{
public enum ConnectionTypes
{
TCP,
TLS,
WS,
WSS
}
}

+ 2
- 0
MQTTnet.Core/Client/MqttClientOptions.cs View File

@@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP;
}
}

+ 13
- 0
MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj View File

@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" />
<ProjectReference Include="..\MQTTnet.Core\MQTTnet.Core.csproj" />
</ItemGroup>

</Project>

+ 171
- 0
MQTTnet.TestApp.NetCore/Program.cs View File

@@ -0,0 +1,171 @@
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.TestApp.NetCore
{
public static class Program
{
public static void Main(string[] args)
{
Console.WriteLine("MQTTnet - TestApp.NetFramework");
Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server");
var pressedKey = Console.ReadKey(true);
if (pressedKey.Key == ConsoleKey.D1)
{
Task.Run(() => RunClientAsync(args));
Thread.Sleep(Timeout.Infinite);
}
else if (pressedKey.Key == ConsoleKey.D2)
{
Task.Run(() => RunServerAsync(args));
Thread.Sleep(Timeout.Infinite);
}
}

private static async Task RunClientAsync(string[] arguments)
{

MqttTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}
};

try
{
var options = new MqttClientOptions
{
Server = "localhost",
ClientId = "XYZ",
CleanSession = true,
ConnectionType = ConnectionTypes.WS
};

var client = new MqttClientFactory().CreateMqttClient(options);
client.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
};

client.Connected += async (s, e) =>
{
Console.WriteLine("### CONNECTED WITH SERVER ###");

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
});

Console.WriteLine("### SUBSCRIBED ###");
};

client.Disconnected += async (s, e) =>
{
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));

try
{
await client.ConnectAsync();
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
};

try
{
await client.ConnectAsync();
}
catch (Exception exception)
{
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
}

Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");

while (true)
{
Console.ReadLine();

var applicationMessage = new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);

await client.PublishAsync(applicationMessage);
}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}

private static void RunServerAsync(string[] arguments)
{
MqttTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}
};

try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
}
};

var mqttServer = new MqttServerFactory().CreateMqttServer(options);
mqttServer.Start();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();

mqttServer.Stop();
}
catch (Exception e)
{
Console.WriteLine(e);
}

Console.ReadLine();
}
}
}

+ 20
- 1
MQTTnet.sln View File

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.8
VisualStudioVersion = 15.0.26730.12
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.md = README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -162,6 +164,22 @@ Global
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -173,6 +191,7 @@ Global
{D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}


+ 1
- 1
Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj View File

@@ -90,7 +90,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj">
<Project>{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}</Project>
<Project>{a480ef90-0eaa-4d9a-b271-47a9c47f6f7d}</Project>
<Name>MQTTnet.NetFramework</Name>
</ProjectReference>
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj">


+ 1
- 0
Tests/MQTTnet.TestApp.NetFramework/Program.cs View File

@@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework

private static async Task RunClientAsync(string[] arguments)
{

MqttTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");


Loading…
Cancel
Save