瀏覽代碼

Add server implementation

release/3.x.x
Christian Kratky 7 年之前
父節點
當前提交
ee25228c0b
共有 100 個檔案被更改,包括 1925 行新增471 行删除
  1. +18
    -25
      .gitignore
  2. +23
    -0
      Frameworks/MQTTnet.NetCore/MQTTnet.NetCore.csproj
  3. +2
    -2
      Frameworks/MQTTnet.NetCore/MqttClientFactory.cs
  4. +56
    -0
      Frameworks/MQTTnet.NetCore/MqttServerAdapter.cs
  5. +15
    -0
      Frameworks/MQTTnet.NetCore/MqttServerFactory.cs
  6. +81
    -0
      Frameworks/MQTTnet.NetCore/MqttTcpChannel.cs
  7. +9
    -5
      Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj
  8. +2
    -2
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  9. +56
    -0
      Frameworks/MQTTnet.NetFramework/MqttServerAdapter.cs
  10. +15
    -0
      Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs
  11. +85
    -0
      Frameworks/MQTTnet.NetFramework/MqttTcpChannel.cs
  12. +2
    -2
      Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
  13. +6
    -4
      Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj
  14. +17
    -0
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  15. +54
    -0
      Frameworks/MQTTnet.UniversalWindows/MqttServerAdapter.cs
  16. +15
    -0
      Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs
  17. +13
    -3
      Frameworks/MQTTnet.UniversalWindows/MqttTcpChannel.cs
  18. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
  19. +0
    -0
      Frameworks/MQTTnet.UniversalWindows/Properties/MQTTnet.Universal.rd.xml
  20. +0
    -0
      Frameworks/MQTTnet.UniversalWindows/project.json
  21. +0
    -7
      MQTT.NET.Core/Client/MqttClientStatistics.cs
  22. +0
    -32
      MQTT.NET.Core/Client/MqttPacketAwaiter.cs
  23. +0
    -39
      MQTT.NET.Core/Diagnostics/MqttTrace.cs
  24. +0
    -21
      MQTT.NET.Core/DictionaryExtensions.cs
  25. +0
    -6
      MQTT.NET.Core/Packets/MqttBasePacket.cs
  26. +0
    -7
      MQTT.NET.Core/Packets/MqttPubAckPacket.cs
  27. +0
    -7
      MQTT.NET.Core/Packets/MqttPubCompPacket.cs
  28. +0
    -7
      MQTT.NET.Core/Packets/MqttPubRelPacket.cs
  29. +0
    -19
      MQTT.NET.Core/Packets/MqttPublishPacket.cs
  30. +0
    -12
      MQTT.NET.Core/Packets/MqttSubAckPacket.cs
  31. +0
    -11
      MQTT.NET.Core/Packets/MqttSubscribePacket.cs
  32. +2
    -2
      MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs
  33. +14
    -0
      MQTTnet.Core/Adapter/IMqttServerAdapter.cs
  34. +29
    -13
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  35. +17
    -0
      MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs
  36. +0
    -0
      MQTTnet.Core/Adapter/MqttConnectingFailedException.cs
  37. +1
    -1
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  38. +120
    -96
      MQTTnet.Core/Client/MqttClient.cs
  39. +0
    -0
      MQTTnet.Core/Client/MqttClientOptions.cs
  40. +16
    -0
      MQTTnet.Core/Client/MqttPacketAwaiter.cs
  41. +43
    -15
      MQTTnet.Core/Client/MqttPacketDispatcher.cs
  42. +0
    -0
      MQTTnet.Core/Client/MqttSubscribeResult.cs
  43. +44
    -0
      MQTTnet.Core/Diagnostics/MqttTrace.cs
  44. +0
    -0
      MQTTnet.Core/Diagnostics/MqttTraceLevel.cs
  45. +0
    -0
      MQTTnet.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs
  46. +0
    -0
      MQTTnet.Core/Exceptions/MqttCommunicationException.cs
  47. +0
    -0
      MQTTnet.Core/Exceptions/MqttCommunicationTimedOutException.cs
  48. +2
    -2
      MQTTnet.Core/Exceptions/MqttProtocolViolationException.cs
  49. +24
    -0
      MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs
  50. +2
    -2
      MQTTnet.Core/Internal/TaskExtensions.cs
  51. +17
    -6
      MQTTnet.Core/MQTTnet.Core.csproj
  52. +2
    -5
      MQTTnet.Core/MqttApplicationMessage.cs
  53. +1
    -3
      MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs
  54. +7
    -0
      MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs
  55. +25
    -0
      MQTTnet.Core/Packets/MqttBasePacket.cs
  56. +1
    -1
      MQTTnet.Core/Packets/MqttBasePublishPacket.cs
  57. +0
    -0
      MQTTnet.Core/Packets/MqttConnAckPacket.cs
  58. +5
    -0
      MQTTnet.Core/Packets/MqttConnectPacket.cs
  59. +0
    -0
      MQTTnet.Core/Packets/MqttDisconnectPacket.cs
  60. +1
    -3
      MQTTnet.Core/Packets/MqttPingReqPacket.cs
  61. +0
    -0
      MQTTnet.Core/Packets/MqttPingRespPacket.cs
  62. +6
    -0
      MQTTnet.Core/Packets/MqttPubAckPacket.cs
  63. +6
    -0
      MQTTnet.Core/Packets/MqttPubCompPacket.cs
  64. +6
    -0
      MQTTnet.Core/Packets/MqttPubRecPacket.cs
  65. +6
    -0
      MQTTnet.Core/Packets/MqttPubRelPacket.cs
  66. +24
    -0
      MQTTnet.Core/Packets/MqttPublishPacket.cs
  67. +20
    -0
      MQTTnet.Core/Packets/MqttSubAckPacket.cs
  68. +19
    -0
      MQTTnet.Core/Packets/MqttSubscribePacket.cs
  69. +1
    -1
      MQTTnet.Core/Packets/MqttUnsubAckPacket.cs
  70. +1
    -1
      MQTTnet.Core/Packets/MqttUnsubscribe.cs
  71. +0
    -0
      MQTTnet.Core/Packets/TopicFilter.cs
  72. +2
    -2
      MQTTnet.Core/Properties/AssemblyInfo.cs
  73. +0
    -0
      MQTTnet.Core/Protocol/MqttConnectReturnCode.cs
  74. +0
    -0
      MQTTnet.Core/Protocol/MqttControlPacketType.cs
  75. +0
    -0
      MQTTnet.Core/Protocol/MqttQualityOfServiceLevel.cs
  76. +0
    -0
      MQTTnet.Core/Protocol/MqttSubscribeReturnCode.cs
  77. +0
    -0
      MQTTnet.Core/Serializer/ByteReader.cs
  78. +0
    -0
      MQTTnet.Core/Serializer/ByteWriter.cs
  79. +29
    -29
      MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
  80. +2
    -2
      MQTTnet.Core/Serializer/IMqttPacketSerializer.cs
  81. +3
    -6
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  82. +1
    -1
      MQTTnet.Core/Serializer/MqttPacketWriter.cs
  83. +22
    -0
      MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
  84. +179
    -0
      MQTTnet.Core/Server/MqttClientSession.cs
  85. +93
    -0
      MQTTnet.Core/Server/MqttClientSessionManager.cs
  86. +61
    -0
      MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
  87. +129
    -0
      MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs
  88. +67
    -0
      MQTTnet.Core/Server/MqttServer.cs
  89. +17
    -0
      MQTTnet.Core/Server/MqttServerOptions.cs
  90. +53
    -0
      MQTTnet.Core/Server/MqttTopicFilterComparer.cs
  91. +0
    -46
      MQTTnet.NET/MqttTcpChannel.cs
  92. +55
    -7
      MQTTnet.sln
  93. +32
    -1
      README.md
  94. +0
    -0
      Tests/MQTTnet.Core.Tests/ByteReaderTests.cs
  95. +0
    -0
      Tests/MQTTnet.Core.Tests/ByteWriterTests.cs
  96. +1
    -1
      Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs
  97. +10
    -11
      Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj
  98. +161
    -0
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  99. +74
    -0
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
  100. +1
    -1
      Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs

+ 18
- 25
.gitignore 查看文件

@@ -15,12 +15,12 @@
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
[Xx]64/
[Xx]86/
[Bb]uild/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/

# Visual Studio 2015 cache/options directory
.vs/
@@ -81,7 +81,6 @@ ipch/
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb

# Visual Studio profiler
*.psess
@@ -140,15 +139,12 @@ publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj

# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# TODO: Un-comment the next line if you do not want to checkin
# your web deploy settings because they may include unencrypted
# passwords
#*.pubxml
*.publishproj

# NuGet Packages
*.nupkg
@@ -170,11 +166,12 @@ csx/
ecf/
rcf/

# Windows Store app package directories and files
# Microsoft Azure ApplicationInsights config file
ApplicationInsights.config

# Windows Store app package directory
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt

# Visual Studio cache files
# files ending in .cache can be ignored
@@ -184,6 +181,7 @@ _pkginfo.txt

# Others
ClientBin/
[Ss]tyle[Cc]op.*
~$*
*~
*.dbmdl
@@ -193,10 +191,6 @@ ClientBin/
node_modules/
orleans.codegen.cs

# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/

# RIA/Silverlight projects
Generated_Code/

@@ -240,13 +234,12 @@ FakesAssemblies/
**/*.Server/ModelManifest.xml
_Pvt_Extensions

# LightSwitch generated files
GeneratedArtifacts/
ModelManifest.xml

# Paket dependency manager
.paket/paket.exe
paket-files/

# FAKE - F# Make
.fake/

# JetBrains Rider
.idea/
*.sln.iml
.fake/

+ 23
- 0
Frameworks/MQTTnet.NetCore/MQTTnet.NetCore.csproj 查看文件

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<Authors>Christian Kratky</Authors>
<Company>Christian Kratky</Company>
<Product>MQTTnet</Product>
<Description>MQTTnet for .NET Core</Description>
<Copyright>Copyright © Christian Kratky 2016-2017</Copyright>
<Version>2.0.4.0</Version>
<AssemblyVersion>2.0.4.0</AssemblyVersion>
<FileVersion>2.0.4.0</FileVersion>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<AssemblyName>MQTTnet</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace>
<PackageId>MQTTnet</PackageId>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" />
</ItemGroup>

</Project>

MQTTnet.NET/MqttClientFactory.cs → Frameworks/MQTTnet.NetCore/MqttClientFactory.cs 查看文件

@@ -3,7 +3,7 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet.NETFramework
namespace MQTTnet
{
public class MqttClientFactory
{
@@ -11,7 +11,7 @@ namespace MQTTnet.NETFramework
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
}
}
}

+ 56
- 0
Frameworks/MQTTnet.NetCore/MqttServerAdapter.cs 查看文件

@@ -0,0 +1,56 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable
{
private CancellationTokenSource _cancellationTokenSource;
private Socket _socket;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public void Start(MqttServerOptions options)
{
if (_socket != null) throw new InvalidOperationException("Server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
_socket.Bind(new IPEndPoint(IPAddress.Any, options.Port));
_socket.Listen(options.ConnectionBacklog);
Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
}

public void Stop()
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_socket?.Dispose();
_socket = null;
}

public void Dispose()
{
Stop();
}

private async Task AcceptConnectionsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var clientSocket = await _socket.AcceptAsync();
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket), new DefaultMqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
}
}
}

+ 15
- 0
Frameworks/MQTTnet.NetCore/MqttServerFactory.cs 查看文件

@@ -0,0 +1,15 @@
using System;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttServer(options, new MqttServerAdapter());
}
}
}

+ 81
- 0
Frameworks/MQTTnet.NetCore/MqttTcpChannel.cs 查看文件

@@ -0,0 +1,81 @@
using System;
using System.Net.Sockets;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;

namespace MQTTnet
{
public class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly Socket _socket;

public MqttTcpChannel()
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

public MqttTcpChannel(Socket socket)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
}

public async Task ConnectAsync(MqttClientOptions options)
{
try
{
await _socket.ConnectAsync(options.Server, options.Port);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task DisconnectAsync()
{
try
{
_socket.Dispose();
await Task.FromResult(0);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try
{
await _socket.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task ReadAsync(byte[] buffer)
{
try
{
var buffer2 = new ArraySegment<byte>(buffer);
await _socket.ReceiveAsync(buffer2, SocketFlags.None);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public void Dispose()
{
_socket?.Dispose();
}
}
}

MQTTnet.NET/MQTTnet.NETFramework.csproj → Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj 查看文件

@@ -7,10 +7,11 @@
<ProjectGuid>{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MQTTnet.NETFramework</RootNamespace>
<AssemblyName>MQTTnet.NETFramework</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<RootNamespace>MQTTnet</RootNamespace>
<AssemblyName>MQTTnet</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -20,6 +21,7 @@
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
@@ -28,19 +30,21 @@
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="MqttClientFactory.cs" />
<Compile Include="MqttServerAdapter.cs" />
<Compile Include="MqttServerFactory.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="MqttTcpChannel.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MQTT.NET.Core\MQTTnet.Core.csproj">
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj">
<Project>{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}</Project>
<Name>MQTTnet.Core</Name>
</ProjectReference>

MQTTnet.Universal/MqttClientFactory.cs → Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs 查看文件

@@ -3,7 +3,7 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Universal
namespace MQTTnet
{
public class MqttClientFactory
{
@@ -11,7 +11,7 @@ namespace MQTTnet.Universal
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
}
}
}

+ 56
- 0
Frameworks/MQTTnet.NetFramework/MqttServerAdapter.cs 查看文件

@@ -0,0 +1,56 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable
{
private CancellationTokenSource _cancellationTokenSource;
private Socket _socket;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public void Start(MqttServerOptions options)
{
if (_socket != null) throw new InvalidOperationException("Server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
_socket.Bind(new IPEndPoint(IPAddress.Any, options.Port));
_socket.Listen(options.ConnectionBacklog);
Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
}

public void Stop()
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_socket?.Dispose();
_socket = null;
}

public void Dispose()
{
Stop();
}

private async Task AcceptConnectionsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket), new DefaultMqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
}
}
}

+ 15
- 0
Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs 查看文件

@@ -0,0 +1,15 @@
using System;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttServer(options, new MqttServerAdapter());
}
}
}

+ 85
- 0
Frameworks/MQTTnet.NetFramework/MqttTcpChannel.cs 查看文件

@@ -0,0 +1,85 @@
using System;
using System.Net.Sockets;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;

namespace MQTTnet
{
public class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly Socket _socket;

public MqttTcpChannel()
{
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
}

public MqttTcpChannel(Socket socket)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
}

public async Task ConnectAsync(MqttClientOptions options)
{
try
{
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task DisconnectAsync()
{
try
{
await Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try
{
await Task.Factory.FromAsync(
// ReSharper disable once AssignNullToNotNullAttribute
_socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null),
_socket.EndSend);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public async Task ReadAsync(byte[] buffer)
{
try
{
await Task.Factory.FromAsync(
// ReSharper disable once AssignNullToNotNullAttribute
_socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null),
_socket.EndReceive);
}
catch (SocketException exception)
{
throw new MqttCommunicationException(exception);
}
}

public void Dispose()
{
_socket?.Dispose();
}
}
}

MQTTnet.NET/Properties/AssemblyInfo.cs → Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs 查看文件

@@ -1,8 +1,8 @@
using System.Reflection;
using System.Runtime.InteropServices;

[assembly: AssemblyTitle("MQTTnet.NETFramework")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyTitle("MQTTnet")]
[assembly: AssemblyDescription("MQTTnet for .NET Framework")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Christian Kratky")]
[assembly: AssemblyProduct("MQTTnet")]

MQTTnet.Universal/MQTTnet.Universal.csproj → Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj 查看文件

@@ -7,12 +7,12 @@
<ProjectGuid>{BD60C727-D8E8-40C3-B8E3-C95A864AE611}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MQTTnet.Universal</RootNamespace>
<AssemblyName>MQTTnet.Universal</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace>
<AssemblyName>MQTTnet</AssemblyName>
<DefaultLanguage>en-US</DefaultLanguage>
<TargetPlatformIdentifier>UAP</TargetPlatformIdentifier>
<TargetPlatformVersion>10.0.14393.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.10586.0</TargetPlatformMinVersion>
<TargetPlatformMinVersion>10.0.10240.0</TargetPlatformMinVersion>
<MinimumVisualStudioVersion>14</MinimumVisualStudioVersion>
<FileAlignment>512</FileAlignment>
<ProjectTypeGuids>{A5A43C5B-DE2A-4C0C-9213-0A381AF9435A};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
@@ -108,12 +108,14 @@
</ItemGroup>
<ItemGroup>
<Compile Include="MqttClientFactory.cs" />
<Compile Include="MqttServerAdapter.cs" />
<Compile Include="MqttServerFactory.cs" />
<Compile Include="MqttTcpChannel.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<EmbeddedResource Include="Properties\MQTTnet.Universal.rd.xml" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MQTT.NET.Core\MQTTnet.Core.csproj">
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj">
<Project>{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}</Project>
<Name>MQTTnet.Core</Name>
</ProjectReference>

+ 17
- 0
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs 查看文件

@@ -0,0 +1,17 @@
using System;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Serializer;

namespace MQTTnet
{
public class MqttClientFactory
{
public MqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
}
}
}

+ 54
- 0
Frameworks/MQTTnet.UniversalWindows/MqttServerAdapter.cs 查看文件

@@ -0,0 +1,54 @@
using System;
using System.Threading;
using Windows.Networking.Sockets;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable
{
private CancellationTokenSource _cancellationTokenSource;
private StreamSocketListener _socket;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public void Start(MqttServerOptions options)
{
if (_socket != null) throw new InvalidOperationException("Server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

_socket = new StreamSocketListener();
_socket.BindServiceNameAsync(options.Port.ToString()).AsTask().Wait();
_socket.ConnectionReceived += ConnectionReceived;
}

private void ConnectionReceived(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
{
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer());

var identifier = $"{args.Socket.Information.RemoteAddress}:{args.Socket.Information.RemotePort}";
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(identifier, clientAdapter));
}

public void Stop()
{
_cancellationTokenSource?.Dispose();

if (_socket != null)
{
_socket.ConnectionReceived -= ConnectionReceived;
}

_socket?.Dispose();
_socket = null;
}

public void Dispose()
{
Stop();
}
}
}

+ 15
- 0
Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs 查看文件

@@ -0,0 +1,15 @@
using System;
using MQTTnet.Core.Server;

namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttServer(options, new MqttServerAdapter());
}
}
}

MQTTnet.Universal/MqttTcpChannel.cs → Frameworks/MQTTnet.UniversalWindows/MqttTcpChannel.cs 查看文件

@@ -8,11 +8,21 @@ using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;
using Buffer = Windows.Storage.Streams.Buffer;

namespace MQTTnet.Universal
namespace MQTTnet
{
public sealed class MqttTcpChannel : IMqttTransportChannel, IDisposable
public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable
{
private readonly StreamSocket _socket = new StreamSocket();
private readonly StreamSocket _socket;

public MqttTcpChannel()
{
_socket = new StreamSocket();
}

public MqttTcpChannel(StreamSocket socket)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
}

public async Task ConnectAsync(MqttClientOptions options)
{

MQTTnet.Universal/Properties/AssemblyInfo.cs → Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs 查看文件

@@ -1,8 +1,8 @@
using System.Reflection;
using System.Runtime.InteropServices;

[assembly: AssemblyTitle("MQTTnet.Universal")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyTitle("MQTTnet")]
[assembly: AssemblyDescription("MQTTnet for Universal Windows")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Christian Kratky")]
[assembly: AssemblyProduct("MQTTnet")]

MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml → Frameworks/MQTTnet.UniversalWindows/Properties/MQTTnet.Universal.rd.xml 查看文件


MQTTnet.Universal/project.json → Frameworks/MQTTnet.UniversalWindows/project.json 查看文件


+ 0
- 7
MQTT.NET.Core/Client/MqttClientStatistics.cs 查看文件

@@ -1,7 +0,0 @@
namespace MQTTnet.Core.Client
{
public class MqttClientStatistics
{
public int SentPackets { get; set; }
}
}

+ 0
- 32
MQTT.NET.Core/Client/MqttPacketAwaiter.cs 查看文件

@@ -1,32 +0,0 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Client
{
public class MqttPacketAwaiter
{
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>();
private readonly Func<MqttBasePacket, bool> _packetSelector;

public MqttPacketAwaiter(Func<MqttBasePacket, bool> packetSelector)
{
if (packetSelector == null) throw new ArgumentNullException(nameof(packetSelector));

_packetSelector = packetSelector;
}

public Task<MqttBasePacket> Task => _taskCompletionSource.Task;

public bool CheckPacket(MqttBasePacket packet)
{
if (!_packetSelector(packet))
{
return false;
}

_taskCompletionSource.SetResult(packet);
return true;
}
}
}

+ 0
- 39
MQTT.NET.Core/Diagnostics/MqttTrace.cs 查看文件

@@ -1,39 +0,0 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public static class MqttTrace
{
public static event EventHandler<MqttTraceMessagePublishedEventArgs> TraceMessagePublished;

public static void Verbose(string source, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Verbose, message, null));
}

public static void Information(string source, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Information, message, null));
}

public static void Warning(string source, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, null));
}

public static void Warning(string source, Exception exception, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, exception));
}

public static void Error(string source, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, null));
}

public static void Error(string source, Exception exception, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, exception));
}
}
}

+ 0
- 21
MQTT.NET.Core/DictionaryExtensions.cs 查看文件

@@ -1,21 +0,0 @@
using System;
using System.Collections.Generic;

namespace MQTTnet.Core
{
public static class DictionaryExtensions
{
public static TValue Take<TKey, TValue>(this IDictionary<TKey, TValue> dictionary, TKey key)
{
if (dictionary == null) throw new ArgumentNullException(nameof(dictionary));

TValue value;
if (dictionary.TryGetValue(key, out value))
{
dictionary.Remove(key);
}

return value;
}
}
}

+ 0
- 6
MQTT.NET.Core/Packets/MqttBasePacket.cs 查看文件

@@ -1,6 +0,0 @@
namespace MQTTnet.Core.Packets
{
public abstract class MqttBasePacket
{
}
}

+ 0
- 7
MQTT.NET.Core/Packets/MqttPubAckPacket.cs 查看文件

@@ -1,7 +0,0 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubAckPacket : MqttBasePacket
{
public ushort PacketIdentifier { get; set; }
}
}

+ 0
- 7
MQTT.NET.Core/Packets/MqttPubCompPacket.cs 查看文件

@@ -1,7 +0,0 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubCompPacket : MqttBasePacket
{
public ushort PacketIdentifier { get; set; }
}
}

+ 0
- 7
MQTT.NET.Core/Packets/MqttPubRelPacket.cs 查看文件

@@ -1,7 +0,0 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubRelPacket : MqttBasePacket
{
public ushort PacketIdentifier { get; set; }
}
}

+ 0
- 19
MQTT.NET.Core/Packets/MqttPublishPacket.cs 查看文件

@@ -1,19 +0,0 @@
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Packets
{
public class MqttPublishPacket : MqttBasePacket
{
public ushort? PacketIdentifier { get; set; }

public bool Retain { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }

public bool Dup { get; set; }

public string Topic { get; set; }

public byte[] Payload { get; set; }
}
}

+ 0
- 12
MQTT.NET.Core/Packets/MqttSubAckPacket.cs 查看文件

@@ -1,12 +0,0 @@
using System.Collections.Generic;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Packets
{
public class MqttSubAckPacket : MqttBasePacket
{
public ushort PacketIdentifier { get; set; }

public List<MqttSubscribeReturnCode> SubscribeReturnCodes { get; set; } = new List<MqttSubscribeReturnCode>();
}
}

+ 0
- 11
MQTT.NET.Core/Packets/MqttSubscribePacket.cs 查看文件

@@ -1,11 +0,0 @@
using System.Collections.Generic;

namespace MQTTnet.Core.Packets
{
public class MqttSubscribePacket : MqttBasePacket
{
public ushort PacketIdentifier { get; set; }
public IList<TopicFilter> TopicFilters { get; set; } = new List<TopicFilter>();
}
}

MQTT.NET.Core/Adapter/IMqttAdapter.cs → MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs 查看文件

@@ -5,7 +5,7 @@ using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Adapter
{
public interface IMqttAdapter
public interface IMqttCommunicationAdapter
{
Task ConnectAsync(MqttClientOptions options, TimeSpan timeout);

@@ -13,6 +13,6 @@ namespace MQTTnet.Core.Adapter

Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout);

Task<MqttBasePacket> ReceivePacket();
Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout);
}
}

+ 14
- 0
MQTTnet.Core/Adapter/IMqttServerAdapter.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using MQTTnet.Core.Server;

namespace MQTTnet.Core.Adapter
{
public interface IMqttServerAdapter
{
event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

void Start(MqttServerOptions options);

void Stop();
}
}

MQTT.NET.Core/Adapter/MqttChannelAdapter.cs → MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs 查看文件

@@ -9,18 +9,15 @@ using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Adapter
{
public class MqttChannelAdapter : IMqttAdapter
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
{
private readonly IMqttPacketSerializer _serializer;
private readonly IMqttTransportChannel _channel;
private readonly IMqttCommunicationChannel _channel;

public MqttChannelAdapter(IMqttTransportChannel channel, IMqttPacketSerializer serializer)
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer)
{
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (serializer == null) throw new ArgumentNullException(nameof(serializer));

_channel = channel;
_serializer = serializer;
_channel = channel ?? throw new ArgumentNullException(nameof(channel));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
}

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
@@ -39,7 +36,7 @@ namespace MQTTnet.Core.Adapter

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
{
MqttTrace.Information(nameof(MqttChannelAdapter), $"Sending with timeout {timeout} >>> {packet}");
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");

bool hasTimeout;
try
@@ -58,15 +55,34 @@ namespace MQTTnet.Core.Adapter
}
}

public async Task<MqttBasePacket> ReceivePacket()
public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
{
var mqttPacket = await _serializer.DeserializeAsync(_channel);
if (mqttPacket == null)
MqttBasePacket packet;
if (timeout > TimeSpan.Zero)
{
var workerTask = _serializer.DeserializeAsync(_channel);
var timeoutTask = Task.Delay(timeout);
var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask;

if (hasTimeout)
{
throw new MqttCommunicationTimedOutException();
}

packet = workerTask.Result;
}
else
{
packet = await _serializer.DeserializeAsync(_channel);
}

if (packet == null)
{
throw new MqttProtocolViolationException("Received malformed packet.");
}

return mqttPacket;
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}");
return packet;
}
}
}

+ 17
- 0
MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs 查看文件

@@ -0,0 +1,17 @@
using System;

namespace MQTTnet.Core.Adapter
{
public class MqttClientConnectedEventArgs : EventArgs
{
public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter)
{
Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier));
ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
}

public string Identifier { get; }

public IMqttCommunicationAdapter ClientAdapter { get; }
}
}

MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs → MQTTnet.Core/Adapter/MqttConnectingFailedException.cs 查看文件


MQTT.NET.Core/Channel/IMqttTransportChannel.cs → MQTTnet.Core/Channel/IMqttCommunicationChannel.cs 查看文件

@@ -3,7 +3,7 @@ using MQTTnet.Core.Client;

namespace MQTTnet.Core.Channel
{
public interface IMqttTransportChannel
public interface IMqttCommunicationChannel
{
Task ConnectAsync(MqttClientOptions options);


MQTT.NET.Core/Client/MqttClient.cs → MQTTnet.Core/Client/MqttClient.cs 查看文件

@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -6,6 +7,7 @@ using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

@@ -13,22 +15,20 @@ namespace MQTTnet.Core.Client
{
public class MqttClient
{
private readonly Dictionary<ushort, MqttPublishPacket> _pendingExactlyOncePublishPackets = new Dictionary<ushort, MqttPublishPacket>();
private readonly ConcurrentDictionary<ushort, MqttPublishPacket> _pendingExactlyOncePublishPackets = new ConcurrentDictionary<ushort, MqttPublishPacket>();
private readonly HashSet<ushort> _processedPublishPackets = new HashSet<ushort>();

private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly MqttClientOptions _options;
private readonly IMqttAdapter _adapter;
private readonly IMqttCommunicationAdapter _adapter;

private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;

public MqttClient(MqttClientOptions options, IMqttAdapter adapter)
public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

_options = options;
_adapter = adapter;
_options = options ?? throw new ArgumentNullException(nameof(options));
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
}

public event EventHandler Connected;
@@ -56,7 +56,7 @@ namespace MQTTnet.Core.Client
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");

@@ -66,10 +66,9 @@ namespace MQTTnet.Core.Client
_packetDispatcher.Reset();
IsConnected = true;

Task.Factory.StartNew(async () => await ReceivePackets(
_cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget();
Task.Run(async () => await ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token).Forget();

var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket, p => true);
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
throw new MqttConnectingFailedException(response.ConnectReturnCode);
@@ -77,8 +76,7 @@ namespace MQTTnet.Core.Client

if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
Task.Factory.StartNew(async () => await SendKeepAliveMessagesAsync(
_cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget();
Task.Run(async () => await SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token).Forget();
}

Connected?.Invoke(this, EventArgs.Empty);
@@ -90,12 +88,11 @@ namespace MQTTnet.Core.Client
await DisconnectInternalAsync();
}

private void ThrowIfNotConnected()
public async Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters)
{
if (!IsConnected)
{
throw new MqttCommunicationException("The client is not connected.");
}
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

return await SubscribeAsync(topicFilters.ToList());
}

public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter> topicFilters)
@@ -110,8 +107,7 @@ namespace MQTTnet.Core.Client
TopicFilters = topicFilters
};

Func<MqttSubAckPacket, bool> packetSelector = p => p.PacketIdentifier == subscribePacket.PacketIdentifier;
var response = await SendAndReceiveAsync(subscribePacket, packetSelector);
var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket);

if (response.SubscribeReturnCodes.Count != topicFilters.Count)
{
@@ -127,6 +123,13 @@ namespace MQTTnet.Core.Client
return result;
}

public async Task Unsubscribe(params string[] topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

await Unsubscribe(topicFilters.ToList());
}

public async Task Unsubscribe(IList<string> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
@@ -139,8 +142,7 @@ namespace MQTTnet.Core.Client
TopicFilters = topicFilters
};

Func<MqttUnsubAckPacket, bool> packetSelector = p => p.PacketIdentifier == unsubscribePacket.PacketIdentifier;
await SendAndReceiveAsync(unsubscribePacket, packetSelector);
await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket);
}

public async Task PublishAsync(MqttApplicationMessage applicationMessage)
@@ -148,37 +150,30 @@ namespace MQTTnet.Core.Client
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
ThrowIfNotConnected();

var publishPacket = new MqttPublishPacket
{
Topic = applicationMessage.Topic,
Payload = applicationMessage.Payload,
QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel,
Retain = applicationMessage.Retain,
Dup = false
};
var publishPacket = applicationMessage.ToPublishPacket();

if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce)
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAsync(publishPacket);
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException();

Func<MqttPubAckPacket, bool> packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value;
await SendAndReceiveAsync(publishPacket, packageSelector);
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException();

Func<MqttPubRecPacket, bool> packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value;
await SendAndReceiveAsync(publishPacket, packageSelector);
await SendAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value });
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAsync(publishPacket.CreateResponse<MqttPubCompPacket>());
}
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private async Task DisconnectInternalAsync()
{
try
@@ -196,38 +191,49 @@ namespace MQTTnet.Core.Client
}
}

private async void ProcessIncomingPacket(MqttBasePacket mqttPacket)
private async void ProcessReceivedPacket(MqttBasePacket mqttPacket)
{
var publishPacket = mqttPacket as MqttPublishPacket;
if (publishPacket != null)
try
{
await ProcessReceivedPublishPacket(publishPacket);
return;
}
if (mqttPacket is MqttPingReqPacket)
{
await SendAsync(new MqttPingRespPacket());
return;
}

var pingReqPacket = mqttPacket as MqttPingReqPacket;
if (pingReqPacket != null)
{
await SendAsync(new MqttPingRespPacket());
return;
}
if (mqttPacket is MqttDisconnectPacket)
{
await DisconnectAsync();
return;
}

var publishPacket = mqttPacket as MqttPublishPacket;
if (publishPacket != null)
{
await ProcessReceivedPublishPacket(publishPacket);
return;
}

var pubRelPacket = mqttPacket as MqttPubRelPacket;
if (pubRelPacket != null)
{
await ProcessReceivedPubRelPacket(pubRelPacket);
return;
}

var pubRelPacket = mqttPacket as MqttPubRelPacket;
if (pubRelPacket != null)
_packetDispatcher.Dispatch(mqttPacket);
}
catch (Exception exception)
{
await ProcessReceivedPubRelPacket(pubRelPacket);
return;
MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet.");
}

_packetDispatcher.Dispatch(mqttPacket);
}

private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
{
if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce)
{
if (publishPacket.PacketIdentifier == null) throw new InvalidOperationException();
_processedPublishPackets.Add(publishPacket.PacketIdentifier.Value);
_processedPublishPackets.Add(publishPacket.PacketIdentifier);
}

var applicationMessage = new MqttApplicationMessage(
@@ -240,15 +246,6 @@ namespace MQTTnet.Core.Client
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage));
}

private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
var originalPublishPacket = _pendingExactlyOncePublishPackets.Take(pubRelPacket.PacketIdentifier);
if (originalPublishPacket == null) throw new MqttCommunicationException();
await SendAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });

FireApplicationMessageReceivedEvent(originalPublishPacket);
}

private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
{
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
@@ -257,43 +254,63 @@ namespace MQTTnet.Core.Client
}
else
{
if (!publishPacket.PacketIdentifier.HasValue) { throw new InvalidOperationException(); }

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
FireApplicationMessageReceivedEvent(publishPacket);
await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value });
await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
_pendingExactlyOncePublishPackets.Add(publishPacket.PacketIdentifier.Value, publishPacket);
await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value });
_pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket;
await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}
}
}

private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
MqttPublishPacket originalPublishPacket;
if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket))
{
throw new MqttCommunicationException();
}

await SendAsync(originalPublishPacket.CreateResponse<MqttPubCompPacket>());

FireApplicationMessageReceivedEvent(originalPublishPacket);
}

private async Task SendAsync(MqttBasePacket packet)
{
await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
}

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(
MqttBasePacket requestPacket, Func<TResponsePacket, bool> responsePacketSelector) where TResponsePacket : MqttBasePacket
private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
Func<MqttBasePacket, bool> selector = p =>
Func<MqttBasePacket, bool> responsePacketSelector = p =>
{
var p1 = p as TResponsePacket;
return p1 != null && responsePacketSelector(p1);
};
if (p1 == null)
{
return false;
}

return (TResponsePacket)await SendAndReceiveAsync(requestPacket, selector);
}
var pi1 = requestPacket as IPacketWithIdentifier;
var pi2 = p as IPacketWithIdentifier;

if (pi1 != null && pi2 != null)
{
if (pi1.PacketIdentifier != pi2.PacketIdentifier)
{
return false;
}
}

return true;
};

private async Task<MqttBasePacket> SendAndReceiveAsync(MqttBasePacket requestPacket, Func<MqttBasePacket, bool> responsePacketSelector)
{
var waitTask = _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout);
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
return await waitTask;
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout);
}

private ushort GetNewPacketIdentifier()
@@ -310,16 +327,20 @@ namespace MQTTnet.Core.Client
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_options.KeepAlivePeriod, cancellationToken);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), p => true);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket());
}
}
catch (MqttCommunicationException)
{
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClient), exception, "Error while sending keep alive packets.");
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
}
finally
{
MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets.");
await DisconnectInternalAsync();
}
}

@@ -330,20 +351,23 @@ namespace MQTTnet.Core.Client
{
while (!cancellationToken.IsCancellationRequested)
{
var mqttPacket = await _adapter.ReceivePacket();
MqttTrace.Information(nameof(MqttChannelAdapter), $"Received <<< {mqttPacket}");
var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}");

Task.Run(() => ProcessIncomingPacket(mqttPacket), cancellationToken).Forget();
Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget();
}
}
catch (MqttCommunicationException)
{
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets.");
await DisconnectInternalAsync();
}
finally
{
MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets.");
await DisconnectInternalAsync();
}
}
}

MQTT.NET.Core/Client/MqttClientOptions.cs → MQTTnet.Core/Client/MqttClientOptions.cs 查看文件


+ 16
- 0
MQTTnet.Core/Client/MqttPacketAwaiter.cs 查看文件

@@ -0,0 +1,16 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Client
{
public class MqttPacketAwaiter : TaskCompletionSource<MqttBasePacket>
{
public MqttPacketAwaiter(Func<MqttBasePacket, bool> packetSelector)
{
PacketSelector = packetSelector ?? throw new ArgumentNullException(nameof(packetSelector));
}

public Func<MqttBasePacket, bool> PacketSelector { get; }
}
}

MQTT.NET.Core/Client/MqttPacketDispatcher.cs → MQTTnet.Core/Client/MqttPacketDispatcher.cs 查看文件

@@ -9,25 +9,27 @@ namespace MQTTnet.Core.Client
{
public class MqttPacketDispatcher
{
private readonly object _syncRoot = new object();
private readonly List<MqttBasePacket> _receivedPackets = new List<MqttBasePacket>();
private readonly List<MqttPacketAwaiter> _packetAwaiters = new List<MqttPacketAwaiter>();

public async Task<MqttBasePacket> WaitForPacketAsync(Func<MqttBasePacket, bool> selector, TimeSpan timeout)
{
if (selector == null) throw new ArgumentNullException(nameof(selector));

var waitHandle = new MqttPacketAwaiter(selector);
AddPacketAwaiter(waitHandle);
var packetAwaiter = AddPacketAwaiter(selector);
DispatchPendingPackets();

var hasTimeout = await Task.WhenAny(Task.Delay(timeout), waitHandle.Task) != waitHandle.Task;
RemovePacketAwaiter(waitHandle);
var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task) != packetAwaiter.Task;
RemovePacketAwaiter(packetAwaiter);

if (hasTimeout)
{
MqttTrace.Error(nameof(MqttPacketDispatcher), $"Timeout while waiting for packet.");
MqttTrace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet.");
throw new MqttCommunicationTimedOutException();
}

return waitHandle.Task.Result;
return packetAwaiter.Task.Result;
}

public void Dispatch(MqttBasePacket packet)
@@ -37,15 +39,33 @@ namespace MQTTnet.Core.Client
var packetDispatched = false;
foreach (var packetAwaiter in GetPacketAwaiters())
{
if (packetAwaiter.CheckPacket(packet))
if (packetAwaiter.PacketSelector(packet))
{
packetAwaiter.SetResult(packet);
packetDispatched = true;
break;
}
}

if (!packetDispatched)
lock (_syncRoot)
{
MqttTrace.Warning(nameof(MqttPacketDispatcher), $"Received packet '{packet}' not dispatched.");
if (!packetDispatched)
{
_receivedPackets.Add(packet);
}
else
{
_receivedPackets.Remove(packet);
}
}
}

public void Reset()
{
lock (_syncRoot)
{
_packetAwaiters.Clear();
_receivedPackets.Clear();
}
}

@@ -57,27 +77,35 @@ namespace MQTTnet.Core.Client
}
}

private void AddPacketAwaiter(MqttPacketAwaiter packetAwaiter)
private MqttPacketAwaiter AddPacketAwaiter(Func<MqttBasePacket, bool> selector)
{
lock (_packetAwaiters)
lock (_syncRoot)
{
var packetAwaiter = new MqttPacketAwaiter(selector);
_packetAwaiters.Add(packetAwaiter);
return packetAwaiter;
}
}

private void RemovePacketAwaiter(MqttPacketAwaiter packetAwaiter)
{
lock (_packetAwaiters)
lock (_syncRoot)
{
_packetAwaiters.Remove(packetAwaiter);
}
}

public void Reset()
private void DispatchPendingPackets()
{
lock (_packetAwaiters)
List<MqttBasePacket> receivedPackets;
lock (_syncRoot)
{
_packetAwaiters.Clear();
receivedPackets = new List<MqttBasePacket>(_receivedPackets);
}

foreach (var pendingPacket in receivedPackets)
{
Dispatch(pendingPacket);
}
}
}

MQTT.NET.Core/Client/MqttSubscribeResult.cs → MQTTnet.Core/Client/MqttSubscribeResult.cs 查看文件


+ 44
- 0
MQTTnet.Core/Diagnostics/MqttTrace.cs 查看文件

@@ -0,0 +1,44 @@
using System;

namespace MQTTnet.Core.Diagnostics
{
public static class MqttTrace
{
public static event EventHandler<MqttTraceMessagePublishedEventArgs> TraceMessagePublished;

public static void Verbose(string source, string message)
{
Publish(source, MqttTraceLevel.Verbose, null, message);
}

public static void Information(string source, string message)
{
Publish(source, MqttTraceLevel.Information, null, message);
}

public static void Warning(string source, string message)
{
Publish(source, MqttTraceLevel.Warning, null, message);
}

public static void Warning(string source, Exception exception, string message)
{
Publish(source, MqttTraceLevel.Warning, exception, message);
}

public static void Error(string source, string message)
{
Publish(source, MqttTraceLevel.Error, null, message);
}

public static void Error(string source, Exception exception, string message)
{
Publish(source, MqttTraceLevel.Error, exception, message);
}

private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message)
{
TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception));
}
}
}

MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs → MQTTnet.Core/Diagnostics/MqttTraceLevel.cs 查看文件


MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs → MQTTnet.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs 查看文件


MQTT.NET.Core/Exceptions/MqttCommunicationException.cs → MQTTnet.Core/Exceptions/MqttCommunicationException.cs 查看文件


MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs → MQTTnet.Core/Exceptions/MqttCommunicationTimedOutException.cs 查看文件


MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs → MQTTnet.Core/Exceptions/MqttProtocolViolationException.cs 查看文件

@@ -4,9 +4,9 @@ namespace MQTTnet.Core.Exceptions
{
public class MqttProtocolViolationException : Exception
{
public MqttProtocolViolationException(string message) : base(message)
public MqttProtocolViolationException(string message)
: base(message)
{

}
}
}

+ 24
- 0
MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs 查看文件

@@ -0,0 +1,24 @@
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Internal
{
internal static class MqttApplicationMessageExtensions
{
public static MqttPublishPacket ToPublishPacket(this MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null)
{
return null;
}

return new MqttPublishPacket
{
Topic = applicationMessage.Topic,
Payload = applicationMessage.Payload,
QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel,
Retain = applicationMessage.Retain,
Dup = false
};
}
}
}

MQTT.NET.Core/TaskExtensions.cs → MQTTnet.Core/Internal/TaskExtensions.cs 查看文件

@@ -1,8 +1,8 @@
using System.Threading.Tasks;

namespace MQTTnet.Core
namespace MQTTnet.Core.Internal
{
public static class TaskExtensions
internal static class TaskExtensions
{
public static void Forget(this Task task)
{

MQTT.NET.Core/MQTTnet.Core.csproj → MQTTnet.Core/MQTTnet.Core.csproj 查看文件

@@ -37,22 +37,24 @@
<!-- A reference to the entire .NET Framework is automatically included -->
</ItemGroup>
<ItemGroup>
<Compile Include="Adapter\IMqttAdapter.cs" />
<Compile Include="Adapter\IMqttCommunicationAdapter.cs" />
<Compile Include="Adapter\IMqttServerAdapter.cs" />
<Compile Include="Adapter\MqttClientConnectedEventArgs.cs" />
<Compile Include="Adapter\MqttConnectingFailedException.cs" />
<Compile Include="Adapter\MqttChannelAdapter.cs" />
<Compile Include="Channel\IMqttTransportChannel.cs" />
<Compile Include="Client\MqttClientStatistics.cs" />
<Compile Include="Adapter\MqttChannelCommunicationAdapter.cs" />
<Compile Include="Channel\IMqttCommunicationChannel.cs" />
<Compile Include="Client\MqttPacketDispatcher.cs" />
<Compile Include="Client\MqttSubscribeResult.cs" />
<Compile Include="Client\MqttPacketAwaiter.cs" />
<Compile Include="Diagnostics\MqttTrace.cs" />
<Compile Include="Diagnostics\MqttTraceLevel.cs" />
<Compile Include="Diagnostics\MqttTraceMessagePublishedEventArgs.cs" />
<Compile Include="DictionaryExtensions.cs" />
<Compile Include="Exceptions\MqttCommunicationException.cs" />
<Compile Include="Exceptions\MqttCommunicationTimedOutException.cs" />
<Compile Include="Exceptions\MqttProtocolViolationException.cs" />
<Compile Include="Internal\MqttApplicationMessageExtensions.cs" />
<Compile Include="MqttApplicationMessageReceivedEventArgs.cs" />
<Compile Include="Packets\IPacketWithPacketIdentifier.cs" />
<Compile Include="Packets\MqttConnAckPacket.cs" />
<Compile Include="Packets\MqttBasePacket.cs" />
<Compile Include="Packets\MqttConnectPacket.cs" />
@@ -62,6 +64,7 @@
<Compile Include="Client\MqttClientOptions.cs" />
<Compile Include="Packets\MqttPingReqPacket.cs" />
<Compile Include="Packets\MqttPubCompPacket.cs" />
<Compile Include="Packets\MqttBasePublishPacket.cs" />
<Compile Include="Packets\MqttUnsubAckPacket.cs" />
<Compile Include="Packets\MqttSubAckPacket.cs" />
<Compile Include="Packets\MqttPubRelPacket.cs" />
@@ -83,7 +86,15 @@
<Compile Include="Serializer\IMqttPacketSerializer.cs" />
<Compile Include="Serializer\MqttPacketReader.cs" />
<Compile Include="Serializer\MqttPacketWriter.cs" />
<Compile Include="TaskExtensions.cs" />
<Compile Include="Server\MqttClientSession.cs" />
<Compile Include="Server\MqttClientSessionManager.cs" />
<Compile Include="Server\MqttOutgoingPublicationsManager.cs" />
<Compile Include="Server\MqttServer.cs" />
<Compile Include="Server\MqttServerOptions.cs" />
<Compile Include="Server\MqttClientPublishPacketContext.cs" />
<Compile Include="Server\MqttTopicFilterComparer.cs" />
<Compile Include="Server\MqttClientSubscriptionsManager.cs" />
<Compile Include="Internal\TaskExtensions.cs" />
</ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\Portable\$(TargetFrameworkVersion)\Microsoft.Portable.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.

MQTT.NET.Core/MqttApplicationMessage.cs → MQTTnet.Core/MqttApplicationMessage.cs 查看文件

@@ -7,11 +7,8 @@ namespace MQTTnet.Core
{
public MqttApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (payload == null) throw new ArgumentNullException(nameof(payload));

Topic = topic;
Payload = payload;
Topic = topic ?? throw new ArgumentNullException(nameof(topic));
Payload = payload ?? throw new ArgumentNullException(nameof(payload));
QualityOfServiceLevel = qualityOfServiceLevel;
Retain = retain;
}

MQTT.NET.Core/MqttApplicationMessageReceivedEventArgs.cs → MQTTnet.Core/MqttApplicationMessageReceivedEventArgs.cs 查看文件

@@ -6,9 +6,7 @@ namespace MQTTnet.Core
{
public MqttApplicationMessageReceivedEventArgs(MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

ApplicationMessage = applicationMessage;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
}

public MqttApplicationMessage ApplicationMessage { get; }

+ 7
- 0
MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs 查看文件

@@ -0,0 +1,7 @@
namespace MQTTnet.Core.Packets
{
public interface IPacketWithIdentifier
{
ushort PacketIdentifier { get; set; }
}
}

+ 25
- 0
MQTTnet.Core/Packets/MqttBasePacket.cs 查看文件

@@ -0,0 +1,25 @@
using System;

namespace MQTTnet.Core.Packets
{
public abstract class MqttBasePacket
{
public TResponsePacket CreateResponse<TResponsePacket>()
{
var responsePacket = Activator.CreateInstance<TResponsePacket>();
var responsePacketWithIdentifier = responsePacket as IPacketWithIdentifier;
if (responsePacketWithIdentifier != null)
{
var requestPacketWithIdentifier = this as IPacketWithIdentifier;
if (requestPacketWithIdentifier == null)
{
throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
}

responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
}

return responsePacket;
}
}
}

MQTT.NET.Core/Packets/MqttPubRecPacket.cs → MQTTnet.Core/Packets/MqttBasePublishPacket.cs 查看文件

@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubRecPacket : MqttBasePacket
public class MqttBasePublishPacket : MqttBasePacket, IPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}

MQTT.NET.Core/Packets/MqttConnAckPacket.cs → MQTTnet.Core/Packets/MqttConnAckPacket.cs 查看文件


MQTT.NET.Core/Packets/MqttConnectPacket.cs → MQTTnet.Core/Packets/MqttConnectPacket.cs 查看文件

@@ -13,5 +13,10 @@
public bool CleanSession { get; set; }

public MqttApplicationMessage WillMessage { get; set; }

public override string ToString()
{
return $"{nameof(MqttConnectPacket)} [ClientId={ClientId}] [Username={Username}] [Password={Password}] [KeepAlivePeriod={KeepAlivePeriod}] [CleanSession={CleanSession}]";
}
}
}

MQTT.NET.Core/Packets/MqttDisconnectPacket.cs → MQTTnet.Core/Packets/MqttDisconnectPacket.cs 查看文件


MQTT.NET.Core/Packets/MqttPingReqPacket.cs → MQTTnet.Core/Packets/MqttPingReqPacket.cs 查看文件

@@ -1,6 +1,4 @@
using System.Xml;

namespace MQTTnet.Core.Packets
namespace MQTTnet.Core.Packets
{
public class MqttPingReqPacket : MqttBasePacket
{

MQTT.NET.Core/Packets/MqttPingRespPacket.cs → MQTTnet.Core/Packets/MqttPingRespPacket.cs 查看文件


+ 6
- 0
MQTTnet.Core/Packets/MqttPubAckPacket.cs 查看文件

@@ -0,0 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubAckPacket : MqttBasePublishPacket
{
}
}

+ 6
- 0
MQTTnet.Core/Packets/MqttPubCompPacket.cs 查看文件

@@ -0,0 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubCompPacket : MqttBasePublishPacket
{
}
}

+ 6
- 0
MQTTnet.Core/Packets/MqttPubRecPacket.cs 查看文件

@@ -0,0 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubRecPacket : MqttBasePublishPacket
{
}
}

+ 6
- 0
MQTTnet.Core/Packets/MqttPubRelPacket.cs 查看文件

@@ -0,0 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttPubRelPacket : MqttBasePublishPacket
{
}
}

+ 24
- 0
MQTTnet.Core/Packets/MqttPublishPacket.cs 查看文件

@@ -0,0 +1,24 @@
using System.Text;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Packets
{
public class MqttPublishPacket : MqttBasePublishPacket
{
public bool Retain { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }

public bool Dup { get; set; }

public string Topic { get; set; }

public byte[] Payload { get; set; }

public override string ToString()
{
return
$"{nameof(MqttPublishPacket)} [Topic={Topic}] [Payload={Encoding.UTF8.GetString(Payload, 0, Payload.Length)}] [QoSLevel={QualityOfServiceLevel}] [Dup={Dup}] [Retain={Retain}] [PacketIdentifier={PacketIdentifier}]";
}
}
}

+ 20
- 0
MQTTnet.Core/Packets/MqttSubAckPacket.cs 查看文件

@@ -0,0 +1,20 @@
using System.Collections.Generic;
using System.Linq;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Packets
{
public class MqttSubAckPacket : MqttBasePacket, IPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

public IList<MqttSubscribeReturnCode> SubscribeReturnCodes { get; set; } = new List<MqttSubscribeReturnCode>();

public override string ToString()
{
var subscribeReturnCodesText = string.Join(",", SubscribeReturnCodes.Select(f => f.ToString()));
return
$"{nameof(MqttSubAckPacket)} [PacketIdentifier={PacketIdentifier}] [SubscribeReturnCodes={subscribeReturnCodesText}]";
}
}
}

+ 19
- 0
MQTTnet.Core/Packets/MqttSubscribePacket.cs 查看文件

@@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Linq;

namespace MQTTnet.Core.Packets
{
public class MqttSubscribePacket : MqttBasePacket, IPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public IList<TopicFilter> TopicFilters { get; set; } = new List<TopicFilter>();

public override string ToString()
{
var topicFiltersText = string.Join(",", TopicFilters.Select(f => $"{f.Topic}@{f.QualityOfServiceLevel}"));
return
$"{nameof(MqttSubscribePacket)} [PacketIdentifier={PacketIdentifier}] [TopicFilters={topicFiltersText}]";
}
}
}

MQTT.NET.Core/Packets/MqttUnsubAckPacket.cs → MQTTnet.Core/Packets/MqttUnsubAckPacket.cs 查看文件

@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttUnsubAckPacket : MqttBasePacket
public class MqttUnsubAckPacket : MqttBasePacket, IPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}

MQTT.NET.Core/Packets/MqttUnsubscribe.cs → MQTTnet.Core/Packets/MqttUnsubscribe.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Core.Packets
{
public class MqttUnsubscribePacket : MqttBasePacket
public class MqttUnsubscribePacket : MqttBasePacket, IPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

MQTT.NET.Core/Packets/TopicFilter.cs → MQTTnet.Core/Packets/TopicFilter.cs 查看文件


MQTT.NET.Core/Properties/AssemblyInfo.cs → MQTTnet.Core/Properties/AssemblyInfo.cs 查看文件

@@ -1,8 +1,8 @@
using System.Reflection;
using System.Runtime.InteropServices;

[assembly: AssemblyTitle("MQTTnet.Core")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyTitle("MQTTnet")]
[assembly: AssemblyDescription("The core library of MQTTnet")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Christian Kratky")]
[assembly: AssemblyProduct("MQTTnet")]

MQTT.NET.Core/Protocol/MqttConnectReturnCode.cs → MQTTnet.Core/Protocol/MqttConnectReturnCode.cs 查看文件


MQTT.NET.Core/Protocol/MqttControlPacketType.cs → MQTTnet.Core/Protocol/MqttControlPacketType.cs 查看文件


MQTT.NET.Core/Protocol/MqttQualityOfServiceLevel.cs → MQTTnet.Core/Protocol/MqttQualityOfServiceLevel.cs 查看文件


MQTT.NET.Core/Protocol/MqttSubscribeReturnCode.cs → MQTTnet.Core/Protocol/MqttSubscribeReturnCode.cs 查看文件


MQTT.NET.Core/Serializer/ByteReader.cs → MQTTnet.Core/Serializer/ByteReader.cs 查看文件


MQTT.NET.Core/Serializer/ByteWriter.cs → MQTTnet.Core/Serializer/ByteWriter.cs 查看文件


MQTT.NET.Core/Serializer/DefaultMqttV311PacketSerializer.cs → MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs 查看文件

@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Serializer
{
public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer
{
public async Task SerializeAsync(MqttBasePacket packet, IMqttTransportChannel destination)
public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination));
@@ -119,8 +119,10 @@ namespace MQTTnet.Core.Serializer
throw new MqttProtocolViolationException("Packet type invalid.");
}

public async Task<MqttBasePacket> DeserializeAsync(IMqttTransportChannel source)
public async Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source)
{
if (source == null) throw new ArgumentNullException(nameof(source));

using (var mqttPacketReader = new MqttPacketReader(source))
{
await mqttPacketReader.ReadToEndAsync();
@@ -211,10 +213,13 @@ namespace MQTTnet.Core.Serializer
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync()
};
}

default:
{
throw new ProtocolViolationException();
}
}
}

throw new ProtocolViolationException();
}

private async Task<MqttBasePacket> DeserializeUnsubscribeAsync(MqttPacketReader reader)
@@ -258,8 +263,8 @@ namespace MQTTnet.Core.Serializer

var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync();

ushort? packetIdentifier = null;
if (qualityOfServiceLevel > 0)
ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
packetIdentifier = await reader.ReadRemainingDataUShortAsync();
}
@@ -378,7 +383,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttConnectPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{
ValidateConnectPacket(packet);

@@ -437,7 +442,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttConnAckPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -452,22 +457,22 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{
await SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
}

private async Task SerializeAsync(MqttPingReqPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
{
await SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
}

private async Task SerializeAsync(MqttPingRespPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
{
await SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
}

private async Task SerializeAsync(MqttPublishPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
{
ValidatePublishPacket(packet);

@@ -475,18 +480,13 @@ namespace MQTTnet.Core.Serializer
{
output.WriteWithLengthPrefix(packet.Topic);

if (packet.QualityOfServiceLevel > 0)
if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("Packet identifier must be set if QoS > 0 [MQTT-2.3.1-1].");
}

output.Write(packet.PacketIdentifier.Value);
output.Write(packet.PacketIdentifier);
}
else
{
if (packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier > 0)
{
throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
}
@@ -507,7 +507,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttPubAckPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -518,7 +518,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttPubRecPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -529,7 +529,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttPubRelPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -540,7 +540,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttPubCompPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -551,7 +551,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttSubscribePacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -571,7 +571,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttSubAckPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -590,7 +590,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -609,7 +609,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttTransportChannel destination)
private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -620,7 +620,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttTransportChannel destination)
private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{

MQTT.NET.Core/Serializer/IMqttPacketSerializer.cs → MQTTnet.Core/Serializer/IMqttPacketSerializer.cs 查看文件

@@ -6,8 +6,8 @@ namespace MQTTnet.Core.Serializer
{
public interface IMqttPacketSerializer
{
Task SerializeAsync(MqttBasePacket mqttPacket, IMqttTransportChannel destination);
Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination);

Task<MqttBasePacket> DeserializeAsync(IMqttTransportChannel source);
Task<MqttBasePacket> DeserializeAsync(IMqttCommunicationChannel source);
}
}

MQTT.NET.Core/Serializer/MqttPacketReader.cs → MQTTnet.Core/Serializer/MqttPacketReader.cs 查看文件

@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading.Tasks;
@@ -12,13 +11,11 @@ namespace MQTTnet.Core.Serializer
public sealed class MqttPacketReader : IDisposable
{
private readonly MemoryStream _remainingData = new MemoryStream();
private readonly IMqttTransportChannel _source;
private readonly IMqttCommunicationChannel _source;

public MqttPacketReader(IMqttTransportChannel source)
public MqttPacketReader(IMqttCommunicationChannel source)
{
if (source == null) throw new ArgumentNullException(nameof(source));

_source = source;
_source = source ?? throw new ArgumentNullException(nameof(source));
}

public MqttControlPacketType ControlPacketType { get; private set; }

MQTT.NET.Core/Serializer/MqttPacketWriter.cs → MQTTnet.Core/Serializer/MqttPacketWriter.cs 查看文件

@@ -99,7 +99,7 @@ namespace MQTTnet.Core.Serializer
_buffer?.Dispose();
}

public async Task WriteToAsync(IMqttTransportChannel destination)
public async Task WriteToAsync(IMqttCommunicationChannel destination)
{
await destination.WriteAsync(_buffer.ToArray());
}

+ 22
- 0
MQTTnet.Core/Server/MqttClientPublishPacketContext.cs 查看文件

@@ -0,0 +1,22 @@
using System;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public class MqttClientPublishPacketContext
{
public MqttClientPublishPacketContext(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
SenderClientSession = senderClientSession ?? throw new ArgumentNullException(nameof(senderClientSession));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
}

public MqttClientSession SenderClientSession { get; }

public MqttPublishPacket PublishPacket { get; }

public int SendTries { get; set; }

public bool IsSent { get; set; }
}
}

+ 179
- 0
MQTTnet.Core/Server/MqttClientSession.cs 查看文件

@@ -0,0 +1,179 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Server
{
public class MqttClientSession
{
private readonly ConcurrentDictionary<ushort, MqttPublishPacket> _pendingIncomingPublications = new ConcurrentDictionary<ushort, MqttPublishPacket>();

private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
private readonly MqttOutgoingPublicationsManager _outgoingPublicationsManager;
private readonly Action<MqttClientSession, MqttPublishPacket> _publishPacketReceivedCallback;
private readonly MqttServerOptions _options;

private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private string _identifier;
private MqttApplicationMessage _willApplicationMessage;
public MqttClientSession(MqttServerOptions options, Action<MqttClientSession, MqttPublishPacket> publishPacketReceivedCallback)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback));
_outgoingPublicationsManager = new MqttOutgoingPublicationsManager(options);
}

public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

_willApplicationMessage = willApplicationMessage;

try
{
_identifier = identifier;
_adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource();

_outgoingPublicationsManager.Start(adapter);
while (!_cancellationTokenSource.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero);
await HandleIncomingPacketAsync(packet);
}
}
catch (MqttCommunicationException)
{
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientSession), exception, $"Client '{_identifier}': Unhandled exception while processing client packets.");
}
finally
{
if (willApplicationMessage != null)
{
_publishPacketReceivedCallback(this, _willApplicationMessage.ToPublishPacket());
}
_outgoingPublicationsManager.Stop();
_cancellationTokenSource.Cancel();
_adapter = null;

MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected.");
}
}

public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

if (!_subscriptionsManager.IsTopicSubscribed(publishPacket))
{
return;
}

_outgoingPublicationsManager.Enqueue(senderClientSession, publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet.");
}

private async Task HandleIncomingPacketAsync(MqttBasePacket packet)
{
var subscribePacket = packet as MqttSubscribePacket;
if (subscribePacket != null)
{
await _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
return;
}

var unsubscribePacket = packet as MqttUnsubscribePacket;
if (unsubscribePacket != null)
{
await _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
return;
}

var publishPacket = packet as MqttPublishPacket;
if (publishPacket != null)
{
await HandleIncomingPublishPacketAsync(publishPacket);
return;
}

var pubRelPacket = packet as MqttPubRelPacket;
if (pubRelPacket != null)
{
await HandleIncomingPubRelPacketAsync(pubRelPacket);
return;
}

var pubAckPacket = packet as MqttPubAckPacket;
if (pubAckPacket != null)
{
await HandleIncomingPubAckPacketAsync(pubAckPacket);
return;
}

if (packet is MqttPingReqPacket)
{
await _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
return;
}

if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
_cancellationTokenSource.Cancel();
return;
}

MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection.");
_cancellationTokenSource.Cancel();
}

private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket)
{
await Task.FromResult(0);
}

private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
{
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
_publishPacketReceivedCallback(this, publishPacket);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
await _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
_publishPacketReceivedCallback(this, publishPacket);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
_pendingIncomingPublications[publishPacket.PacketIdentifier] = publishPacket;
await _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}
}

private async Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
{
MqttPublishPacket publishPacket;
if (!_pendingIncomingPublications.TryRemove(pubRelPacket.PacketIdentifier, out publishPacket))
{
return;
}

await _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
_publishPacketReceivedCallback(this, publishPacket);
}
}
}

+ 93
- 0
MQTTnet.Core/Server/MqttClientSessionManager.cs 查看文件

@@ -0,0 +1,93 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Server
{
public class MqttClientSessionManager
{
private readonly ConcurrentDictionary<string, MqttClientSession> _clientSessions = new ConcurrentDictionary<string, MqttClientSession>();
private readonly MqttServerOptions _options;

public MqttClientSessionManager(MqttServerOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
{
try
{
var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket;
if (connectPacket == null)
{
throw new MqttProtocolViolationException("The first packet from a client must be a 'Connect' packet [MQTT-3.1.0-1].");
}

var connectReturnCode = MqttConnectReturnCode.ConnectionAccepted;
if (_options.ConnectionValidator != null)
{
connectReturnCode = _options.ConnectionValidator(connectPacket);
}

MqttClientSession clientSession = null;
var isSessionPresent = _clientSessions.ContainsKey(connectPacket.ClientId);
if (isSessionPresent && connectPacket.CleanSession)
{
MqttClientSession _;
_clientSessions.TryRemove(connectPacket.ClientId, out _);
}
else if (!connectPacket.CleanSession)
{
_clientSessions.TryGetValue(connectPacket.ClientId, out clientSession);
}

await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession != null
}, _options.DefaultCommunicationTimeout);

if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
return;
}

if (clientSession == null)
{
clientSession = new MqttClientSession(_options, DispatchPublishPacket);
_clientSessions.TryAdd(connectPacket.ClientId, clientSession);
}

await clientSession.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter);
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttServer), exception, exception.Message);
}
finally
{
await eventArgs.ClientAdapter.DisconnectAsync();
}
}

public void Clear()
{
_clientSessions.Clear();
}

private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
foreach (var clientSession in _clientSessions.Values.ToList())
{
clientSession.DispatchPublishPacket(senderClientSession, publishPacket);
}
}
}
}

+ 61
- 0
MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs 查看文件

@@ -0,0 +1,61 @@
using System;
using System.Collections.Concurrent;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Server
{
public class MqttClientSubscriptionsManager
{
private readonly ConcurrentDictionary<string, MqttQualityOfServiceLevel> _subscribedTopics = new ConcurrentDictionary<string, MqttQualityOfServiceLevel>();

public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));

var responsePacket = subscribePacket.CreateResponse<MqttSubAckPacket>();
foreach (var topicFilter in subscribePacket.TopicFilters)
{
_subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2.
}

return responsePacket;
}

public MqttUnsubAckPacket Unsubscribe(MqttUnsubscribePacket unsubscribePacket)
{
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));

foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
MqttQualityOfServiceLevel _;
_subscribedTopics.TryRemove(topicFilter, out _);
}

return unsubscribePacket.CreateResponse<MqttUnsubAckPacket>();
}

public bool IsTopicSubscribed(MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

foreach (var subscribedTopic in _subscribedTopics)
{
if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key))
{
continue;
}

if (subscribedTopic.Value < publishPacket.QualityOfServiceLevel)
{
continue;
}

return true;
}

return false;
}
}
}

+ 129
- 0
MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs 查看文件

@@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Internal;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Server
{
public class MqttOutgoingPublicationsManager
{
private readonly AutoResetEvent _resetEvent = new AutoResetEvent(false);
private readonly List<MqttClientPublishPacketContext> _pendingPublishPackets = new List<MqttClientPublishPacketContext>();

private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;

public MqttOutgoingPublicationsManager(MqttServerOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public void Start(IMqttCommunicationAdapter adapter)
{
if (_cancellationTokenSource != null)
{
throw new InvalidOperationException($"{nameof(MqttOutgoingPublicationsManager)} already started.");
}

_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();

Task.Run(async () => await SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
}

public void Stop()
{
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;
}

public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

lock (_pendingPublishPackets)
{
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket));
_resetEvent.Set();
}
}

private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
_resetEvent.WaitOne();
if (cancellationToken.IsCancellationRequested)
{
return;
}

List<MqttClientPublishPacketContext> pendingPublishPackets;
lock (_pendingPublishPackets)
{
pendingPublishPackets = _pendingPublishPackets.ToList();
}

foreach (var publishPacket in pendingPublishPackets)
{
await TrySendPendingPublishPacketAsync(publishPacket);
}
}
catch (Exception e)
{
MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), e, "Error while sending pending publish packets.");
}
finally
{
Cleanup();
}
}
}

private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketContext publishPacketContext)
{
try
{
if (_adapter == null)
{
return;
}

publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout);

publishPacketContext.IsSent = true;
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed.");
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed.");
}
finally
{
publishPacketContext.SendTries++;
}
}

private void Cleanup()
{
lock (_pendingPublishPackets)
{
_pendingPublishPackets.RemoveAll(p => p.IsSent);
}
}
}
}

+ 67
- 0
MQTTnet.Core/Server/MqttServer.cs 查看文件

@@ -0,0 +1,67 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Server
{
public class MqttServer
{
private readonly MqttClientSessionManager _clientSessionManager;
private readonly IMqttServerAdapter _adapter;
private readonly MqttServerOptions _options;

private CancellationTokenSource _cancellationTokenSource;

public MqttServer(MqttServerOptions options, IMqttServerAdapter adapter)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_clientSessionManager = new MqttClientSessionManager(options);
}

public void InjectClient(string identifier, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter));
}

public void Start()
{
if (_cancellationTokenSource != null)
{
throw new InvalidOperationException("The server is already started.");
}

_cancellationTokenSource = new CancellationTokenSource();

_adapter.ClientConnected += OnClientConnected;
_adapter.Start(_options);

MqttTrace.Information(nameof(MqttServer), "Started.");
}

public void Stop()
{
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;

_adapter.ClientConnected -= OnClientConnected;
_adapter.Stop();

_clientSessionManager.Clear();

MqttTrace.Information(nameof(MqttServer), "Stopped.");
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{
MqttTrace.Information(nameof(MqttServer), $"Client '{eventArgs.Identifier}': Connected.");
Task.Run(async () => await _clientSessionManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token).Forget();
}
}
}

+ 17
- 0
MQTTnet.Core/Server/MqttServerOptions.cs 查看文件

@@ -0,0 +1,17 @@
using System;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Server
{
public class MqttServerOptions
{
public int Port { get; set; } = 1883;

public int ConnectionBacklog { get; set; } = 10;

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public Func<MqttConnectPacket, MqttConnectReturnCode> ConnectionValidator { get; set; }
}
}

+ 53
- 0
MQTTnet.Core/Server/MqttTopicFilterComparer.cs 查看文件

@@ -0,0 +1,53 @@
using System;

namespace MQTTnet.Core.Server
{
public static class MqttTopicFilterComparer
{
private const char TopicLevelSeparator = '/';

public static bool IsMatch(string topic, string filter)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (filter == null) throw new ArgumentNullException(nameof(filter));

if (string.Equals(topic, filter, StringComparison.Ordinal))
{
return true;
}

var fragmentsTopic = topic.Split(new[] { TopicLevelSeparator }, StringSplitOptions.None);
var fragmentsFilter = filter.Split(new[] { TopicLevelSeparator }, StringSplitOptions.None);

for (var i = 0; i < fragmentsFilter.Length; i++)
{
if (fragmentsFilter[i] == "+")
{
continue;
}

if (fragmentsFilter[i] == "#" && i == fragmentsFilter.Length - 1)
{
return true;
}

if (i >= fragmentsTopic.Length)
{
return false;
}

if (!string.Equals(fragmentsFilter[i], fragmentsTopic[i]))
{
return false;
}
}

if (fragmentsTopic.Length > fragmentsFilter.Length)
{
return false;
}

return true;
}
}
}

+ 0
- 46
MQTTnet.NET/MqttTcpChannel.cs 查看文件

@@ -1,46 +0,0 @@
using System;
using System.Net.Sockets;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Client;

namespace MQTTnet.NETFramework
{
public class MqttTcpChannel : IMqttTransportChannel, IDisposable
{
private readonly Socket _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

public async Task ConnectAsync(MqttClientOptions options)
{
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null);
}

public async Task DisconnectAsync()
{
await Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null);
}

public async Task WriteAsync(byte[] buffer)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

await Task.Factory.FromAsync(
// ReSharper disable once AssignNullToNotNullAttribute
_socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null),
_socket.EndSend);
}

public async Task ReadAsync(byte[] buffer)
{
await Task.Factory.FromAsync(
// ReSharper disable once AssignNullToNotNullAttribute
_socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null),
_socket.EndReceive);
}

public void Dispose()
{
_socket?.Dispose();
}
}
}

+ 55
- 7
MQTTnet.sln 查看文件

@@ -1,17 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25420.1
# Visual Studio 15
VisualStudioVersion = 15.0.26228.4
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core", "MQTT.NET.Core\MQTTnet.Core.csproj", "{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core", "MQTTnet.Core\MQTTnet.Core.csproj", "{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestConsole", "MQTT.NET.TestConsole\MQTTnet.TestConsole.csproj", "{7B19B139-2E9D-4F1D-88B4-6180B4CF872A}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestMqttClient", "Tests\MQTTnet.TestMqttClient\MQTTnet.TestMqttClient.csproj", "{7B19B139-2E9D-4F1D-88B4-6180B4CF872A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "MQTT.NET.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NETFramework", "MQTTnet.NET\MQTTnet.NETFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NetFramework", "Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj", "{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Universal", "MQTTnet.Universal\MQTTnet.Universal.csproj", "{BD60C727-D8E8-40C3-B8E3-C95A864AE611}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.UniversalWindows", "Frameworks\MQTTnet.UniversalWindows\MQTTnet.UniversalWindows.csproj", "{BD60C727-D8E8-40C3-B8E3-C95A864AE611}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{9248C2E1-B9D6-40BF-81EC-86004D7765B4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.NetCore", "Frameworks\MQTTnet.NetCore\MQTTnet.NetCore.csproj", "{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Frameworks", "Frameworks", "{32A630A7-2598-41D7-B625-204CD906F5FB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestMqttServer", "Tests\MQTTnet.TestMqttServer\MQTTnet.TestMqttServer.csproj", "{6F8C0C0C-59EC-4921-9267-370AE113C34F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -105,8 +113,48 @@ Global
{BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x64.Build.0 = Release|x64
{BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x86.ActiveCfg = Release|x86
{BD60C727-D8E8-40C3-B8E3-C95A864AE611}.Release|x86.Build.0 = Release|x86
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|ARM.ActiveCfg = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|ARM.Build.0 = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|x64.ActiveCfg = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|x64.Build.0 = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|x86.ActiveCfg = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Debug|x86.Build.0 = Debug|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|Any CPU.Build.0 = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|ARM.ActiveCfg = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|ARM.Build.0 = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|x64.ActiveCfg = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|x64.Build.0 = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|x86.ActiveCfg = Release|Any CPU
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C}.Release|x86.Build.0 = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|ARM.ActiveCfg = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|ARM.Build.0 = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|x64.ActiveCfg = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|x64.Build.0 = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|x86.ActiveCfg = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Debug|x86.Build.0 = Debug|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|Any CPU.Build.0 = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|ARM.ActiveCfg = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|ARM.Build.0 = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|x64.ActiveCfg = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|x64.Build.0 = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|x86.ActiveCfg = Release|Any CPU
{6F8C0C0C-59EC-4921-9267-370AE113C34F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7B19B139-2E9D-4F1D-88B4-6180B4CF872A} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{BD60C727-D8E8-40C3-B8E3-C95A864AE611} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{88BE3FC9-79DC-4440-AC6B-C21BD97C6A3C} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{6F8C0C0C-59EC-4921-9267-370AE113C34F} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
EndGlobalSection
EndGlobal

+ 32
- 1
README.md 查看文件

@@ -5,6 +5,12 @@
# MQTTnet
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server.

## Supported frameworks

* .NET Framework 4.5.2+
* .NET Core 1.0+
* Universal Windows (UWP) 10.0.10240+

# MqttClient
## Example

@@ -81,4 +87,29 @@ while (true)
# MqttServer

## Example
TBD

```c#
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
}
};

var mqttServer = new MqttServerFactory().CreateMqttServer(options);
mqttServer.Start();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();

mqttServer.Stop();
```

MQTT.NET.Core.Tests/ByteReaderTests.cs → Tests/MQTTnet.Core.Tests/ByteReaderTests.cs 查看文件


MQTT.NET.Core.Tests/ByteWriterTests.cs → Tests/MQTTnet.Core.Tests/ByteWriterTests.cs 查看文件


MQTT.NET.Core.Tests/DefaultMqttV311PacketSerializerTests.cs → Tests/MQTTnet.Core.Tests/DefaultMqttV311PacketSerializerTests.cs 查看文件

@@ -361,7 +361,7 @@ namespace MQTTnet.Core.Tests
}


public class TestChannel : IMqttTransportChannel
public class TestChannel : IMqttCommunicationChannel
{
private readonly MemoryStream _stream = new MemoryStream();


MQTT.NET.Core.Tests/MQTTnet.Core.Tests.csproj → Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj 查看文件

@@ -9,8 +9,9 @@
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MQTTnet.Core.Tests</RootNamespace>
<AssemblyName>MQTTnet.Core.Tests</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -33,28 +34,26 @@
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="ByteReaderTests.cs" />
<Compile Include="ByteWriterTests.cs" />
<Compile Include="DefaultMqttV311PacketSerializerTests.cs" />
<Compile Include="MqttServerTests.cs" />
<Compile Include="MqttSubscriptionsManagerTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestMqttServerAdapter.cs" />
<Compile Include="TopicFilterComparerTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MQTT.NET.Core\MQTTnet.Core.csproj">
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj">
<Project>{99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE}</Project>
<Name>MQTTnet.Core</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.

+ 161
- 0
Tests/MQTTnet.Core.Tests/MqttServerTests.cs 查看文件

@@ -0,0 +1,161 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;

namespace MQTTnet.Core.Tests
{
[TestClass]
public class MqttServerTests
{
[TestMethod]
public async Task MqttServer_PublishSimple_AtMostOnce()
{
await TestPublishAsync(
"A/B/C",
MqttQualityOfServiceLevel.AtMostOnce,
"A/B/C",
MqttQualityOfServiceLevel.AtMostOnce,
1);
}

[TestMethod]
public async Task MqttServer_PublishSimple_AtLeastOnce()
{
await TestPublishAsync(
"A/B/C",
MqttQualityOfServiceLevel.AtLeastOnce,
"A/B/C",
MqttQualityOfServiceLevel.AtLeastOnce,
1);
}

[TestMethod]
public async Task MqttServer_PublishSimple_ExactlyOnce()
{
await TestPublishAsync(
"A/B/C",
MqttQualityOfServiceLevel.ExactlyOnce,
"A/B/C",
MqttQualityOfServiceLevel.ExactlyOnce,
1);
}

[TestMethod]
public async Task MqttServer_WillMessage()
{
var s = new MqttServer(new MqttServerOptions(), new TestMqttServerAdapter());
s.Start();

var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false);
var c1 = ConnectTestClient("c1", null, s);
var c2 = ConnectTestClient("c2", willMessage, s);

var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));

await c2.DisconnectAsync();

await Task.Delay(1000);

s.Stop();

Assert.AreEqual(1, receivedMessagesCount);
}

private MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server)
{
var adapterA = new TestMqttClientAdapter();
var adapterB = new TestMqttClientAdapter();
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;

var client = new MqttClient(new MqttClientOptions(), adapterA);
server.InjectClient(clientId, adapterB);
client.ConnectAsync(willMessage).Wait();
return client;
}

private async Task TestPublishAsync(
string topic,
MqttQualityOfServiceLevel qualityOfServiceLevel,
string topicFilter,
MqttQualityOfServiceLevel filterQualityOfServiceLevel,
int expectedReceivedMessagesCount)
{
var s = new MqttServer(new MqttServerOptions(), new TestMqttServerAdapter());
s.Start();

var c1 = ConnectTestClient("c1", null, s);
var c2 = ConnectTestClient("c2", null, s);

var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;

await c1.SubscribeAsync(new TopicFilter(topicFilter, filterQualityOfServiceLevel));
await c2.PublishAsync(new MqttApplicationMessage(topic, new byte[0], qualityOfServiceLevel, false));

await Task.Delay(500);
await c1.Unsubscribe(topicFilter);

await Task.Delay(500);

s.Stop();

Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
}
}

public class TestMqttClientAdapter : IMqttCommunicationAdapter
{
private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();

public TestMqttClientAdapter Partner { get; set; }

public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{
await Task.FromResult(0);
}

public async Task DisconnectAsync()
{
await Task.FromResult(0);
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
{
ThrowIfPartnerIsNull();

Partner.SendPacketInternal(packet);
await Task.FromResult(0);
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
{
ThrowIfPartnerIsNull();

return await Task.Run(() => _incomingPackets.Take());
}

private void SendPacketInternal(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

_incomingPackets.Add(packet);
}

private void ThrowIfPartnerIsNull()
{
if (Partner == null)
{
throw new InvalidOperationException("Partner is not set.");
}
}
}
}

+ 74
- 0
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs 查看文件

@@ -0,0 +1,74 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;

namespace MQTTnet.Core.Tests
{
[TestClass]
public class MqttSubscriptionsManagerTests
{
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{
var sm = new MqttClientSubscriptionsManager();

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));

sm.Subscribe(sp);

var pp = new MqttPublishPacket
{
Topic = "A/B/C",
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsTrue(sm.IsTopicSubscribed(pp));
}

[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{
var sm = new MqttClientSubscriptionsManager();

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));

sm.Subscribe(sp);

var pp = new MqttPublishPacket
{
Topic = "A/B/X",
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsFalse(sm.IsTopicSubscribed(pp));
}

[TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{
var sm = new MqttClientSubscriptionsManager();

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));

sm.Subscribe(sp);

var pp = new MqttPublishPacket
{
Topic = "A/B/C",
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsTrue(sm.IsTopicSubscribed(pp));

var up = new MqttUnsubscribePacket();
up.TopicFilters.Add("A/B/C");
sm.Unsubscribe(up);

Assert.IsFalse(sm.IsTopicSubscribed(pp));
}
}
}

MQTT.NET.Core.Tests/Properties/AssemblyInfo.cs → Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs 查看文件

@@ -6,7 +6,7 @@ using System.Runtime.InteropServices;
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Christian Kratky")]
[assembly: AssemblyProduct("MQTTnet")]
[assembly: AssemblyCopyright("Copyright © Christian Kratky 2015-2017")]
[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]

部分文件因文件數量過多而無法顯示

Loading…
取消
儲存