@@ -0,0 +1,119 @@ | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
using System; | |||||
using System.Net.WebSockets; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace MQTTnet.Implementations | |||||
{ | |||||
public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private ClientWebSocket _webSocket = null; | |||||
private const int BufferSize = 4096; | |||||
private const int BufferAmplifier = 20; | |||||
private byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; | |||||
private int WebSocketBufferSize = 0; | |||||
private int WebSocketBufferOffset = 0; | |||||
public MqttWebSocketsChannel() | |||||
{ | |||||
_webSocket = new ClientWebSocket(); | |||||
} | |||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
_webSocket = null; | |||||
try | |||||
{ | |||||
_webSocket = new ClientWebSocket(); | |||||
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); | |||||
} | |||||
catch (WebSocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
finally | |||||
{ | |||||
} | |||||
} | |||||
public async Task DisconnectAsync() | |||||
{ | |||||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
if (_webSocket != null) | |||||
_webSocket.Dispose(); | |||||
} | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
return Task.WhenAll(ReadToBufferAsync(buffer)); | |||||
} | |||||
private async Task ReadToBufferAsync(byte[] buffer) | |||||
{ | |||||
var temporaryBuffer = new byte[BufferSize]; | |||||
var offset = 0; | |||||
while (_webSocket.State == WebSocketState.Open) | |||||
{ | |||||
if (WebSocketBufferSize == 0) | |||||
{ | |||||
WebSocketBufferOffset = 0; | |||||
WebSocketReceiveResult response; | |||||
do | |||||
{ | |||||
response = | |||||
await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None); | |||||
temporaryBuffer.CopyTo(WebSocketBuffer, offset); | |||||
offset += response.Count; | |||||
temporaryBuffer = new byte[BufferSize]; | |||||
} while (!response.EndOfMessage); | |||||
WebSocketBufferSize = response.Count; | |||||
if (response.MessageType == WebSocketMessageType.Close) | |||||
{ | |||||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); | |||||
} | |||||
Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); | |||||
WebSocketBufferSize -= buffer.Length; | |||||
WebSocketBufferOffset += buffer.Length; | |||||
} | |||||
else | |||||
{ | |||||
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); | |||||
WebSocketBufferSize -= buffer.Length; | |||||
WebSocketBufferOffset += buffer.Length; | |||||
} | |||||
return; | |||||
} | |||||
} | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | |||||
var writeBuffer = System.Text.Encoding.ASCII.GetString(buffer); | |||||
try | |||||
{ | |||||
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, | |||||
CancellationToken.None); | |||||
} | |||||
catch (WebSocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -24,6 +24,8 @@ | |||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="System.Net.Security" Version="4.3.1" /> | <PackageReference Include="System.Net.Security" Version="4.3.1" /> | ||||
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | |||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" /> | |||||
<PackageReference Include="System.Threading.Thread" Version="4.3.0" /> | <PackageReference Include="System.Threading.Thread" Version="4.3.0" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Client; | using MQTTnet.Core.Client; | ||||
using MQTTnet.Core.Serializer; | using MQTTnet.Core.Serializer; | ||||
using MQTTnet.Implementations; | using MQTTnet.Implementations; | ||||
using MQTTnet.Core.Channel; | |||||
namespace MQTTnet | namespace MQTTnet | ||||
{ | { | ||||
@@ -12,7 +13,23 @@ namespace MQTTnet | |||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | if (options == null) throw new ArgumentNullException(nameof(options)); | ||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); | |||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); | |||||
} | |||||
private IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) | |||||
{ | |||||
switch (options.ConnectionType) | |||||
{ | |||||
case ConnectionTypes.TCP: | |||||
case ConnectionTypes.TLS: | |||||
return new MqttTcpChannel(); | |||||
case ConnectionTypes.WS: | |||||
case ConnectionTypes.WSS: | |||||
return new MqttWebSocketsChannel(); | |||||
default: | |||||
return null; | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,14 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace MQTTnet.Core.Client | |||||
{ | |||||
public enum ConnectionTypes | |||||
{ | |||||
TCP, | |||||
TLS, | |||||
WS, | |||||
WSS | |||||
} | |||||
} |
@@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client | |||||
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | ||||
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | ||||
public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; | |||||
} | } | ||||
} | } |
@@ -0,0 +1,13 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" /> | |||||
<ProjectReference Include="..\MQTTnet.Core\MQTTnet.Core.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,171 @@ | |||||
using MQTTnet.Core; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Diagnostics; | |||||
using MQTTnet.Core.Packets; | |||||
using MQTTnet.Core.Protocol; | |||||
using MQTTnet.Core.Server; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace MQTTnet.TestApp.NetCore | |||||
{ | |||||
class Program | |||||
{ | |||||
public static void Main(string[] args) | |||||
{ | |||||
Console.WriteLine("MQTTnet - TestApp.NetFramework"); | |||||
Console.WriteLine("1 = Start client"); | |||||
Console.WriteLine("2 = Start server"); | |||||
var pressedKey = Console.ReadKey(true); | |||||
if (pressedKey.Key == ConsoleKey.D1) | |||||
{ | |||||
Task.Run(() => RunClientAsync(args)); | |||||
Thread.Sleep(Timeout.Infinite); | |||||
} | |||||
else if (pressedKey.Key == ConsoleKey.D2) | |||||
{ | |||||
Task.Run(() => RunServerAsync(args)); | |||||
Thread.Sleep(Timeout.Infinite); | |||||
} | |||||
} | |||||
private static async Task RunClientAsync(string[] arguments) | |||||
{ | |||||
MqttTrace.TraceMessagePublished += (s, e) => | |||||
{ | |||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||||
if (e.Exception != null) | |||||
{ | |||||
Console.WriteLine(e.Exception); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
var options = new MqttClientOptions | |||||
{ | |||||
Server = "localhost", | |||||
ClientId = "XYZ", | |||||
CleanSession = true, | |||||
ConnectionType = ConnectionTypes.WS | |||||
}; | |||||
var client = new MqttClientFactory().CreateMqttClient(options); | |||||
client.ApplicationMessageReceived += (s, e) => | |||||
{ | |||||
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); | |||||
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); | |||||
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); | |||||
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); | |||||
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); | |||||
Console.WriteLine(); | |||||
}; | |||||
client.Connected += async (s, e) => | |||||
{ | |||||
Console.WriteLine("### CONNECTED WITH SERVER ###"); | |||||
await client.SubscribeAsync(new List<TopicFilter> | |||||
{ | |||||
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) | |||||
}); | |||||
Console.WriteLine("### SUBSCRIBED ###"); | |||||
}; | |||||
client.Disconnected += async (s, e) => | |||||
{ | |||||
Console.WriteLine("### DISCONNECTED FROM SERVER ###"); | |||||
await Task.Delay(TimeSpan.FromSeconds(5)); | |||||
try | |||||
{ | |||||
await client.ConnectAsync(); | |||||
} | |||||
catch | |||||
{ | |||||
Console.WriteLine("### RECONNECTING FAILED ###"); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
await client.ConnectAsync(); | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); | |||||
} | |||||
Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); | |||||
while (true) | |||||
{ | |||||
Console.ReadLine(); | |||||
var applicationMessage = new MqttApplicationMessage( | |||||
"A/B/C", | |||||
Encoding.UTF8.GetBytes("Hello World"), | |||||
MqttQualityOfServiceLevel.AtLeastOnce, | |||||
false | |||||
); | |||||
await client.PublishAsync(applicationMessage); | |||||
} | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
Console.WriteLine(exception); | |||||
} | |||||
} | |||||
private static void RunServerAsync(string[] arguments) | |||||
{ | |||||
MqttTrace.TraceMessagePublished += (s, e) => | |||||
{ | |||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||||
if (e.Exception != null) | |||||
{ | |||||
Console.WriteLine(e.Exception); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
var options = new MqttServerOptions | |||||
{ | |||||
ConnectionValidator = p => | |||||
{ | |||||
if (p.ClientId == "SpecialClient") | |||||
{ | |||||
if (p.Username != "USER" || p.Password != "PASS") | |||||
{ | |||||
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; | |||||
} | |||||
} | |||||
return MqttConnectReturnCode.ConnectionAccepted; | |||||
} | |||||
}; | |||||
var mqttServer = new MqttServerFactory().CreateMqttServer(options); | |||||
mqttServer.Start(); | |||||
Console.WriteLine("Press any key to exit."); | |||||
Console.ReadLine(); | |||||
mqttServer.Stop(); | |||||
} | |||||
catch (Exception e) | |||||
{ | |||||
Console.WriteLine(e); | |||||
} | |||||
Console.ReadLine(); | |||||
} | |||||
} | |||||
} |
@@ -1,7 +1,7 @@ | |||||
| | ||||
Microsoft Visual Studio Solution File, Format Version 12.00 | Microsoft Visual Studio Solution File, Format Version 12.00 | ||||
# Visual Studio 15 | # Visual Studio 15 | ||||
VisualStudioVersion = 15.0.26730.8 | |||||
VisualStudioVersion = 15.0.26730.12 | |||||
MinimumVisualStudioVersion = 10.0.40219.1 | MinimumVisualStudioVersion = 10.0.40219.1 | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" | Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" | ||||
EndProject | EndProject | ||||
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution | |||||
README.md = README.md | README.md = README.md | ||||
EndProjectSection | EndProjectSection | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}" | |||||
EndProject | |||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
Debug|Any CPU = Debug|Any CPU | Debug|Any CPU = Debug|Any CPU | ||||
@@ -162,6 +164,22 @@ Global | |||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU | ||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
@@ -173,6 +191,7 @@ Global | |||||
{D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | ||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | ||||
@@ -90,7 +90,7 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\..\Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj"> | <ProjectReference Include="..\..\Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj"> | ||||
<Project>{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}</Project> | |||||
<Project>{a480ef90-0eaa-4d9a-b271-47a9c47f6f7d}</Project> | |||||
<Name>MQTTnet.NetFramework</Name> | <Name>MQTTnet.NetFramework</Name> | ||||
</ProjectReference> | </ProjectReference> | ||||
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj"> | <ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj"> | ||||
@@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework | |||||
private static async Task RunClientAsync(string[] arguments) | private static async Task RunClientAsync(string[] arguments) | ||||
{ | { | ||||
MqttTrace.TraceMessagePublished += (s, e) => | MqttTrace.TraceMessagePublished += (s, e) => | ||||
{ | { | ||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | ||||