Browse Source

Merge from develop.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
6483f896c1
27 changed files with 837 additions and 225 deletions
  1. +1
    -1
      Build/MQTTnet.AspNetCore.nuspec
  2. +1
    -1
      Build/MQTTnet.Extensions.ManagedClient.nuspec
  3. +1
    -1
      Build/MQTTnet.Extensions.Rpc.nuspec
  4. +7
    -2
      Build/MQTTnet.nuspec
  5. +222
    -0
      MQTTnet.noUWP.sln
  6. +2
    -1
      README.md
  7. +1
    -0
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
  8. +2
    -1
      Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj
  9. +42
    -14
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  10. +2
    -1
      Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
  11. +1
    -0
      Source/MQTTnet/Client/IMqttClient.cs
  12. +29
    -28
      Source/MQTTnet/Client/MqttClient.cs
  13. +5
    -5
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  14. +34
    -0
      Source/MQTTnet/Internal/BlockingQueue.cs
  15. +8
    -2
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  16. +179
    -78
      Source/MQTTnet/Server/MqttClientSession.cs
  17. +23
    -14
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  18. +5
    -5
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  19. +9
    -29
      Source/MQTTnet/Server/MqttServer.cs
  20. +42
    -0
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  21. +3
    -2
      Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
  22. +95
    -3
      Tests/MQTTnet.Core.Tests/MqttClientTests.cs
  23. +114
    -21
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  24. +5
    -7
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
  25. +2
    -1
      Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
  26. +1
    -7
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  27. +1
    -1
      appveyor.yml

+ 1
- 1
Build/MQTTnet.AspNetCore.nuspec View File

@@ -14,7 +14,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.5-rc3" />
<dependency id="MQTTnet" version="2.8.5" />
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.3" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.1.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.1.1" />


+ 1
- 1
Build/MQTTnet.Extensions.ManagedClient.nuspec View File

@@ -14,7 +14,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.5-rc3" />
<dependency id="MQTTnet" version="2.8.5" />
</dependencies>
</metadata>



+ 1
- 1
Build/MQTTnet.Extensions.Rpc.nuspec View File

@@ -14,7 +14,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.5-rc3" />
<dependency id="MQTTnet" version="2.8.5" />
</dependencies>
</metadata>



+ 7
- 2
Build/MQTTnet.nuspec View File

@@ -12,12 +12,17 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> * [Core] Updated nuget packages due to security issues.
* [Client] Fixed wrong behavior of publish method when client is disconnecting (thanks to @PaulFake).
* [Client] Added readonly property for accessing options.
* [ManagedClient] Added max pending messages count option.
* [ManagedClient] Add pending messages overflow strategy option.
* [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake).
* [ManagedClient] Added readonly property for accessing options.
* [Server] Added new method which exposes all retained messages.
* [Server] Removed (wrong) setter from the server options interface.
* [Server] fixed cpu spike in case a client disconnectes (issue 421).
* [Server] fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport.
* [Server] Fixed cpu spike in case a client disconnects (issue 421).
* [Server] Fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport.
* [Server] Fixed wrong retain flag when distributing application messages (thanks to @trev0115).
* [Server] Fixed issue which closes a connection when reconnecting with the same client ID (thanks to @fogzot).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 222
- 0
MQTTnet.noUWP.sln View File

@@ -0,0 +1,222 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{9248C2E1-B9D6-40BF-81EC-86004D7765B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Source", "Source", "{32A630A7-2598-41D7-B625-204CD906F5FB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet", "Source\MQTTnet\MQTTnet.csproj", "{3587E506-55A2-4EB3-99C7-DC01E42D25D2}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1-BC3A-420A-BE9C-FA2401431CF9}"
ProjectSection(SolutionItems) = preProject
Build\build.ps1 = Build\build.ps1
Build\MQTTnet.AspNetCore.nuspec = Build\MQTTnet.AspNetCore.nuspec
Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec
Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec
Build\MQTTnet.nuspec = Build\MQTTnet.nuspec
Build\upload.ps1 = Build\upload.ps1
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}"
ProjectSection(SolutionItems) = preProject
.bettercodehub.yml = .bettercodehub.yml
appveyor.yml = appveyor.yml
LICENSE = LICENSE
README.md = README.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore", "Source\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{12816BCC-AF9E-44A9-9AE5-C246AF2A0587}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.ManagedClient", "Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj", "{C400533A-8EBA-4F0B-BF4D-295C3708604B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", "Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|ARM = Debug|ARM
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|ARM = Release|ARM
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.ActiveCfg = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.Build.0 = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.ActiveCfg = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.Build.0 = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.ActiveCfg = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.Build.0 = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.ActiveCfg = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.ActiveCfg = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.ActiveCfg = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.Build.0 = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.ActiveCfg = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.Build.0 = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.ActiveCfg = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.Build.0 = Debug|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.Build.0 = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.ActiveCfg = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.Build.0 = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.ActiveCfg = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.Build.0 = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.ActiveCfg = Release|Any CPU
{3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.ActiveCfg = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.Build.0 = Debug|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU
{F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.ActiveCfg = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.Build.0 = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.ActiveCfg = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.Build.0 = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.ActiveCfg = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.Build.0 = Debug|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.Build.0 = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.ActiveCfg = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.Build.0 = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.ActiveCfg = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU
{C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.ActiveCfg = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.Build.0 = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.ActiveCfg = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.Build.0 = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.ActiveCfg = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.Build.0 = Debug|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.Build.0 = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.ActiveCfg = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.Build.0 = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.ActiveCfg = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.Build.0 = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.ActiveCfg = Release|Any CPU
{998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.Build.0 = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.ActiveCfg = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.Build.0 = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.ActiveCfg = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.Build.0 = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.ActiveCfg = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.Build.0 = Debug|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.Build.0 = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.ActiveCfg = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.Build.0 = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.ActiveCfg = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.Build.0 = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.ActiveCfg = Release|Any CPU
{C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.Build.0 = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.ActiveCfg = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.Build.0 = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.ActiveCfg = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.Build.0 = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.ActiveCfg = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.Build.0 = Debug|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.Build.0 = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.ActiveCfg = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.Build.0 = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.ActiveCfg = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.Build.0 = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.ActiveCfg = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{C444E9C8-95FA-430E-9126-274129DE16CD} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
{998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}
EndGlobalSection
EndGlobal

+ 2
- 1
README.md View File

@@ -86,12 +86,13 @@ This project also listed at Open Collective (https://opencollective.com/mqttnet)

This library is used in the following projects:

* Azure Functions MQTT Bindings, [CaseOnline.Azure.WebJobs.Extensions.Mqtt](https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/)
* HA4IoT (Open Source Home Automation system for .NET, <https://github.com/chkr1011/HA4IoT>)
* MQTT Client Rx (Wrapper for Reactive Extensions, <https://github.com/1iveowl/MQTTClient.rx>)
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8))
* Wirehome.Core (Open Source Home Automation system for .NET Core, <https://github.com/chkr1011/Wirehome.Core>)

If you use this library and want to see your project here please let me know.
If you use this library and want to see your project here please create a pull request.

## MIT License



+ 1
- 0
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs View File

@@ -10,6 +10,7 @@ namespace MQTTnet.Extensions.ManagedClient
bool IsStarted { get; }
bool IsConnected { get; }
int PendingApplicationMessagesCount { get; }
IManagedMqttClientOptions Options { get; }

event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;


+ 2
- 1
Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj View File

@@ -1,7 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard1.3;netstandard2.0;net452;net461</TargetFrameworks>
<TargetFrameworks>netstandard1.3;netstandard2.0</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' AND '$(MSBuildRuntimeType)' != 'Core'">$(TargetFrameworks);uap10.0</TargetFrameworks>
<Product />
<Company />


+ 42
- 14
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -27,8 +27,7 @@ namespace MQTTnet.Extensions.ManagedClient
private CancellationTokenSource _publishingCancellationToken;

private ManagedMqttClientStorageManager _storageManager;
private IManagedMqttClientOptions _options;

private bool _subscriptionsNotPushed;

public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
@@ -47,6 +46,7 @@ namespace MQTTnet.Extensions.ManagedClient
public bool IsConnected => _mqttClient.IsConnected;
public bool IsStarted => _connectionCancellationToken != null;
public int PendingApplicationMessagesCount => _messageQueue.Count;
public IManagedMqttClientOptions Options { get; private set; }

public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
@@ -70,11 +70,11 @@ namespace MQTTnet.Extensions.ManagedClient

if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");

_options = options;
Options = options;

if (_options.Storage != null)
if (Options.Storage != null)
{
_storageManager = new ManagedMqttClientStorageManager(_options.Storage);
_storageManager = new ManagedMqttClientStorageManager(Options.Storage);
var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);

foreach (var message in messages)
@@ -116,16 +116,16 @@ namespace MQTTnet.Extensions.ManagedClient
ManagedMqttApplicationMessage removedMessage = null;
lock (_messageQueue)
{
if (_messageQueue.Count >= _options.MaxPendingMessages)
if (_messageQueue.Count >= Options.MaxPendingMessages)
{
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
return;
}

if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
@@ -219,7 +219,7 @@ namespace MQTTnet.Extensions.ManagedClient
if (connectionState == ReconnectionResult.NotConnected)
{
StopPublishing();
await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
return;
}

@@ -232,7 +232,7 @@ namespace MQTTnet.Extensions.ManagedClient

if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -254,7 +254,15 @@ namespace MQTTnet.Extensions.ManagedClient
{
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
{
var message = _messageQueue.Dequeue();
//Peek at the message without dequeueing in order to prevent the
//possibility of the queue growing beyond the configured cap.
//Previously, messages could be re-enqueued if there was an
//exception, and this re-enqueueing did not honor the cap.
//Furthermore, because re-enqueueing would shuffle the order
//of the messages, the DropOldestQueuedMessage strategy would
//be unable to know which message is actually the oldest and would
//instead drop the first item in the queue.
var message = _messageQueue.PeekAndWait();
if (message == null)
{
continue;
@@ -284,6 +292,16 @@ namespace MQTTnet.Extensions.ManagedClient
try
{
_mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
{
//While publishing this message, this.PublishAsync could have booted this
//message off the queue to make room for another (when using a cap
//with the DropOldestQueuedMessage strategy). If the first item
//in the queue is equal to this message, then it's safe to remove
//it from the queue. If not, that means this.PublishAsync has already
//removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}
_storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
}
catch (MqttCommunicationException exception)
@@ -292,9 +310,19 @@ namespace MQTTnet.Extensions.ManagedClient

_logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");

if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
_messageQueue.Enqueue(message);
//If QoS 0, we don't want this message to stay on the queue.
//If QoS 1 or 2, it's possible that, when using a cap, this message
//has been booted off the queue by this.PublishAsync, in which case this
//thread will not continue to try to publish it. While this does
//contradict the expected behavior of QoS 1 and 2, that's also true
//for the usage of a message queue cap, so it's still consistent
//with prior behavior in that way.
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
{
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}
}
}
catch (Exception exception)
@@ -360,7 +388,7 @@ namespace MQTTnet.Extensions.ManagedClient

try
{
await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
return ReconnectionResult.Reconnected;
}
catch (Exception exception)


+ 2
- 1
Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj View File

@@ -1,7 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard1.3;netstandard2.0;net452;net461</TargetFrameworks>
<TargetFrameworks>netstandard1.3;netstandard2.0</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' AND '$(MSBuildRuntimeType)' != 'Core'">$(TargetFrameworks);uap10.0</TargetFrameworks>
<Product />
<Company />


+ 1
- 0
Source/MQTTnet/Client/IMqttClient.cs View File

@@ -7,6 +7,7 @@ namespace MQTTnet.Client
public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable
{
bool IsConnected { get; }
IMqttClientOptions Options { get; }

event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;


+ 29
- 28
Source/MQTTnet/Client/MqttClient.cs View File

@@ -25,7 +25,6 @@ namespace MQTTnet.Client
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetChildLogger _logger;

private IMqttClientOptions _options;
private CancellationTokenSource _cancellationTokenSource;
internal Task _packetReceiverTask;
internal Task _keepAliveMessageSenderTask;
@@ -46,6 +45,7 @@ namespace MQTTnet.Client
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected { get; private set; }
public IMqttClientOptions Options { get; private set; }

public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
{
@@ -56,16 +56,17 @@ namespace MQTTnet.Client

try
{
_options = options;
Options = options;

_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();

_cancellationTokenSource = new CancellationTokenSource();
_disconnectGate = 0;
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);
_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose($"Trying to connect with server ({Options.ChannelOptions}).");
await _adapter.ConnectAsync(Options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

StartReceivingPackets(_cancellationTokenSource.Token);
@@ -75,7 +76,7 @@ namespace MQTTnet.Client

_sendTracker.Restart();

if (_options.KeepAlivePeriod != TimeSpan.Zero)
if (Options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
}
@@ -141,7 +142,7 @@ namespace MQTTnet.Client
return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
public Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

@@ -153,7 +154,7 @@ namespace MQTTnet.Client
TopicFilters = topicFilters.ToList()
};

await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
return SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, _cancellationTokenSource.Token);
}

public Task PublishAsync(MqttApplicationMessage applicationMessage)
@@ -201,11 +202,11 @@ namespace MQTTnet.Client
{
var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.Credentials?.Username,
Password = _options.Credentials?.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
ClientId = Options.ClientId,
Username = Options.Credentials?.Username,
Password = Options.Credentials?.Password,
CleanSession = Options.CleanSession,
KeepAlivePeriod = (ushort)Options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

@@ -241,14 +242,14 @@ namespace MQTTnet.Client

try
{
await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);

if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}

await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
_logger.Verbose("Disconnected from adapter.");
}
catch (Exception adapterException)
@@ -310,7 +311,7 @@ namespace MQTTnet.Client
try
{
await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false);
var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);

return (TResponsePacket)respone;
}
@@ -327,16 +328,16 @@ namespace MQTTnet.Client

private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
_logger.Verbose("Start sending keep alive packets.");

try
{
_logger.Verbose("Start sending keep alive packets.");

while (!cancellationToken.IsCancellationRequested)
{
var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
if (_options.KeepAliveSendInterval.HasValue)
var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75);
if (Options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}

var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
@@ -381,10 +382,10 @@ namespace MQTTnet.Client

private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
_logger.Verbose("Start receiving packets.");

try
{
_logger.Verbose("Start receiving packets.");

while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken)
@@ -507,7 +508,7 @@ namespace MQTTnet.Client
() => ReceivePacketsAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
TaskScheduler.Default).Unwrap();
}

private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
@@ -516,7 +517,7 @@ namespace MQTTnet.Client
() => SendKeepAliveMessagesAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
TaskScheduler.Default).Unwrap();
}

private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
@@ -525,7 +526,7 @@ namespace MQTTnet.Client
{
// TODO: Move conversion to formatter.
var applicationMessage = publishPacket.ToApplicationMessage();
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));
}
catch (Exception exception)
{


+ 5
- 5
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -2,7 +2,6 @@
using System;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using System.IO;
@@ -17,7 +16,7 @@ namespace MQTTnet.Implementations
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
private Socket _socket;
private Stream _stream;

@@ -87,8 +86,9 @@ namespace MQTTnet.Implementations

public void Dispose()
{
Cleanup(ref _stream, (s) => s.Dispose());
Cleanup(ref _socket, (s) => {
Cleanup(ref _stream, s => s.Dispose());
Cleanup(ref _socket, s =>
{
if (s.Connected)
{
s.Shutdown(SocketShutdown.Both);
@@ -102,7 +102,7 @@ namespace MQTTnet.Implementations
// Try the instance callback.
if (_options.TlsOptions.CertificateValidationCallback != null)
{
return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors,_clientOptions);
return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions);
}

// Try static callback.


+ 34
- 0
Source/MQTTnet/Internal/BlockingQueue.cs View File

@@ -55,6 +55,40 @@ namespace MQTTnet.Internal
_gate.WaitOne();
}
}
public TItem PeekAndWait()
{
while (true)
{
lock (_syncRoot)
{
if (_items.Count > 0)
{
return _items.First.Value;
}

if (_items.Count == 0)
{
_gate.Reset();
}
}

_gate.WaitOne();
}
}

public void RemoveFirst(Predicate<TItem> match)
{
if (match == null) throw new ArgumentNullException(nameof(match));

lock (_syncRoot)
{
if (_items.Count > 0 && match(_items.First.Value))
{
_items.RemoveFirst();
}
}
}

public TItem RemoveFirst()
{


+ 8
- 2
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs View File

@@ -36,8 +36,8 @@ namespace MQTTnet.Server
{
return;
}
Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken);
Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public void Pause()
@@ -50,6 +50,12 @@ namespace MQTTnet.Server
_isPaused = false;
}

public void Reset()
{
_lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart();
}

public void PacketReceived(MqttBasePacket packet)
{
_lastPacketReceivedTracker.Restart();


+ 179
- 78
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -9,6 +9,7 @@ using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Serializer;

namespace MQTTnet.Server
{
@@ -17,6 +18,7 @@ namespace MQTTnet.Server
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();

private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientPendingPacketsQueue _pendingPacketsQueue;
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
@@ -28,29 +30,33 @@ namespace MQTTnet.Server
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
private bool _wasCleanDisconnect;
private IMqttChannelAdapter _adapter;
private Task _workerTask;
private IDisposable _cleanupHandle;

private string _adapterEndpoint;
private MqttProtocolVersion? _adapterProtocolVersion;
public MqttClientSession(
string clientId,
IMqttServerOptions options,
MqttClientSessionsManager sessionsManager,
MqttRetainedMessagesManager retainedMessagesManager,
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

_options = options ?? throw new ArgumentNullException(nameof(options));
_sessionsManager = sessionsManager;
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));

ClientId = clientId;

_logger = logger.CreateChildLogger(nameof(MqttClientSession));

_keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger);
_subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server);
_subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, eventDispatcher);
_pendingPacketsQueue = new MqttClientPendingPacketsQueue(_options, this, _logger);
}

@@ -59,9 +65,9 @@ namespace MQTTnet.Server
public void FillStatus(MqttClientSessionStatus status)
{
status.ClientId = ClientId;
status.IsConnected = _adapter != null;
status.Endpoint = _adapter?.Endpoint;
status.ProtocolVersion = _adapter?.PacketFormatterAdapter?.ProtocolVersion;
status.IsConnected = _cancellationTokenSource != null;
status.Endpoint = _adapterEndpoint;
status.ProtocolVersion = _adapterProtocolVersion;
status.PendingApplicationMessagesCount = _pendingPacketsQueue.Count;
status.LastPacketReceived = _keepAliveMonitor.LastPacketReceived;
status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived;
@@ -73,24 +79,154 @@ namespace MQTTnet.Server
return _workerTask;
}

private async Task RunInternalAsync(MqttApplicationMessage willMessage, int keepAlivePeriod, IMqttChannelAdapter adapter)
public void Stop(MqttClientDisconnectType type)
{
Stop(type, false);
}

public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(publishPacket.Topic, publishPacket.QualityOfServiceLevel);
if (!checkSubscriptionsResult.IsSubscribed)
{
return;
}

publishPacket = new MqttPublishPacket
{
Topic = publishPacket.Topic,
Payload = publishPacket.Payload,
QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel,
Retain = publishPacket.Retain,
Dup = false
};

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

if (_options.ClientMessageQueueInterceptor != null)
{
var context = new MqttClientMessageQueueInterceptorContext(
senderClientSession?.ClientId,
ClientId,
publishPacket.ToApplicationMessage());

_options.ClientMessageQueueInterceptor?.Invoke(context);

if (!context.AcceptEnqueue || context.ApplicationMessage == null)
{
return;
}

publishPacket.Topic = context.ApplicationMessage.Topic;
publishPacket.Payload = context.ApplicationMessage.Payload;
publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel;
}

_pendingPacketsQueue.Enqueue(publishPacket);
}

public Task SubscribeAsync(IList<TopicFilter> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

_subscriptionsManager.Subscribe(new MqttSubscribePacket
{
TopicFilters = topicFilters
});

EnqueueSubscribedRetainedMessages(topicFilters);
return Task.FromResult(0);
}

public Task UnsubscribeAsync(IList<string> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

_subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket
{
TopicFilters = topicFilters
});

return Task.FromResult(0);
}

public void ClearPendingApplicationMessages()
{
_pendingPacketsQueue.Clear();
}

public void Dispose()
{
_pendingPacketsQueue?.Dispose();

_cancellationTokenSource?.Cancel ();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}

private void Stop(MqttClientDisconnectType type, bool isInsideSession)
{
try
{
var cts = _cancellationTokenSource;
if (cts == null || cts.IsCancellationRequested)
{
return;
}

_cancellationTokenSource?.Cancel(false);

_wasCleanDisconnect = type == MqttClientDisconnectType.Clean;

if (_willMessage != null && !_wasCleanDisconnect)
{
_sessionsManager.EnqueueApplicationMessage(this, _willMessage.ToPublishPacket());
}

_willMessage = null;

if (!isInsideSession)
{
_workerTask?.GetAwaiter().GetResult();
}
}
finally
{
_logger.Info("Client '{0}': Disconnected (clean={1}).", ClientId, _wasCleanDisconnect);
_eventDispatcher.OnClientDisconnected(ClientId, _wasCleanDisconnect);
}
}

private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

try
{
_adapter = adapter;
if (_cancellationTokenSource != null)
{
Stop(MqttClientDisconnectType.Clean, true);
}

adapter.ReadingPacketStarted += OnAdapterReadingPacketStarted;
adapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted;

_cancellationTokenSource = new CancellationTokenSource();

//woraround for https://github.com/dotnet/corefx/issues/24430
//workaround for https://github.com/dotnet/corefx/issues/24430
#pragma warning disable 4014
_cleanupHandle = _cancellationTokenSource.Token.Register(() => CleanupAsync());
_cleanupHandle = _cancellationTokenSource.Token.Register(async () =>
{
await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false);
TryDisposeAdapter(adapter);
});
#pragma warning restore 4014
//endworkaround
//end workaround

_wasCleanDisconnect = false;
_willMessage = willMessage;
@@ -98,6 +234,9 @@ namespace MQTTnet.Server
_pendingPacketsQueue.Start(adapter, _cancellationTokenSource.Token);
_keepAliveMonitor.Start(keepAlivePeriod, _cancellationTokenSource.Token);

_adapterEndpoint = adapter.Endpoint;
_adapterProtocolVersion = adapter.PacketSerializer.ProtocolVersion;

while (!_cancellationTokenSource.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false);
@@ -133,34 +272,35 @@ namespace MQTTnet.Server
}
finally
{
await CleanupAsync().ConfigureAwait(false);
_adapterEndpoint = null;
_adapterProtocolVersion = null;

// Uncomment as soon as the workaround above is no longer needed.
//await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false);
//TryDisposeAdapter(adapter);

_cleanupHandle?.Dispose();
_cleanupHandle = null;
_adapter = null;
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}
}

private async Task CleanupAsync()
private void TryDisposeAdapter(IMqttChannelAdapter adapter)
{
var adapter = _adapter;
try
if (adapter == null)
{
if (adapter == null)
{
return;
}

_adapter = null;
return;
}

try
{
adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted;
adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted;

await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
adapter.Dispose();
}
catch (Exception exception)
{
@@ -231,65 +371,21 @@ namespace MQTTnet.Server
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

if (_options.ClientMessageQueueInterceptor != null)
private async Task TryDisconnectAdapterAsync(IMqttChannelAdapter adapter)
{
if (adapter == null)
{
var context = new MqttClientMessageQueueInterceptorContext(
senderClientSession?.ClientId,
ClientId,
publishPacket.ToApplicationMessage());

_options.ClientMessageQueueInterceptor?.Invoke(context);

if (!context.AcceptEnqueue || context.ApplicationMessage == null)
{
return;
}

publishPacket.Topic = context.ApplicationMessage.Topic;
publishPacket.Payload = context.ApplicationMessage.Payload;
publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel;
return;
}

_pendingPacketsQueue.Enqueue(publishPacket);
}

public Task SubscribeAsync(IList<TopicFilter> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

_subscriptionsManager.Subscribe(new MqttSubscribePacket
try
{
TopicFilters = topicFilters
});

EnqueueSubscribedRetainedMessages(topicFilters);
return Task.FromResult(0);
}

public Task UnsubscribeAsync(IList<string> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

_subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket
await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception exception)
{
TopicFilters = topicFilters
});

return Task.FromResult(0);
}

public void ClearPendingApplicationMessages()
{
_pendingPacketsQueue.Clear();
}

public void Dispose()
{
_pendingPacketsQueue?.Dispose();

_cancellationTokenSource?.Cancel ();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
_logger.Error(exception, "Error while disconnecting channel adapter.");
}
}

private void ProcessReceivedPacket(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
@@ -366,7 +462,12 @@ namespace MQTTnet.Server
var retainedMessages = _retainedMessagesManager.GetSubscribedMessages(topicFilters);
foreach (var applicationMessage in retainedMessages)
{
EnqueueApplicationMessage(null, applicationMessage);
var publishPacket = applicationMessage.ToPublishPacket();

// Set the retain flag to true according to [MQTT-3.3.1-8].
publishPacket.Retain = true;

EnqueueApplicationMessage(null, publishPacket);
}
}



+ 23
- 14
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -23,25 +23,29 @@ namespace MQTTnet.Server
private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();

private readonly CancellationToken _cancellationToken;
private readonly MqttServerEventDispatcher _eventDispatcher;

private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;

public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, IMqttNetChildLogger logger)
public MqttClientSessionsManager(
IMqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
_cancellationToken = cancellationToken;

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager));

_cancellationToken = cancellationToken;
_eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
_options = options ?? throw new ArgumentNullException(nameof(options));
Server = server ?? throw new ArgumentNullException(nameof(server));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
}

public MqttServer Server { get; }

public void Start()
{
Task.Factory.StartNew(() => TryProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
@@ -62,7 +66,7 @@ namespace MQTTnet.Server

public Task StartSession(IMqttChannelAdapter clientAdapter)
{
return Task.Run(() => RunSession(clientAdapter, _cancellationToken), _cancellationToken);
return Task.Run(() => RunSessionAsync(clientAdapter, _cancellationToken), _cancellationToken);
}

public IList<IMqttClientSessionStatus> GetClientStatus()
@@ -177,7 +181,7 @@ namespace MQTTnet.Server
applicationMessage = interceptorContext.ApplicationMessage;
}

Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);
_eventDispatcher.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);

if (applicationMessage.Retain)
{
@@ -186,7 +190,12 @@ namespace MQTTnet.Server

foreach (var clientSession in GetSessions())
{
clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage);
var publishPacket = applicationMessage.ToPublishPacket();

// Set the retain flag to true according to [MQTT-3.3.1-9].
publishPacket.Retain = false;

clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, publishPacket);
}
}
catch (OperationCanceledException)
@@ -206,7 +215,7 @@ namespace MQTTnet.Server
}
}

private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
private async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
@@ -239,7 +248,6 @@ namespace MQTTnet.Server
}

var result = PrepareClientSession(connectPacket);
var clientSession = result.Session;

await clientAdapter.SendPacketAsync(
new MqttConnAckPacket
@@ -249,9 +257,10 @@ namespace MQTTnet.Server
},
cancellationToken).ConfigureAwait(false);

Server.OnClientConnected(clientId);
_logger.Info("Client '{0}': Connected.", clientId);
_eventDispatcher.OnClientConnected(clientId);

await clientSession.RunAsync(connectPacket.WillMessage, connectPacket.KeepAlivePeriod, clientAdapter).ConfigureAwait(false);
await result.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -315,7 +324,7 @@ namespace MQTTnet.Server
{
isExistingSession = false;

clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger);
clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _eventDispatcher, _logger);
_sessions[connectPacket.ClientId] = clientSession;

_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);


+ 5
- 5
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -10,14 +10,14 @@ namespace MQTTnet.Server
{
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly IMqttServerOptions _options;
private readonly MqttServer _server;
private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly string _clientId;

public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServer server)
public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServerEventDispatcher eventDispatcher)
{
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_options = options ?? throw new ArgumentNullException(nameof(options));
_server = server;
_eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
}

public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket)
@@ -58,7 +58,7 @@ namespace MQTTnet.Server
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
}

_server.OnClientSubscribedTopic(_clientId, topicFilter);
_eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter);
}
}

@@ -75,7 +75,7 @@ namespace MQTTnet.Server
{
_subscriptions.Remove(topicFilter);

_server.OnClientUnsubscribedTopic(_clientId, topicFilter);
_eventDispatcher.OnClientUnsubscribedTopic(_clientId, topicFilter);
}
}



+ 9
- 29
Source/MQTTnet/Server/MqttServer.cs View File

@@ -11,6 +11,7 @@ namespace MQTTnet.Server
{
public class MqttServer : IMqttServer
{
private readonly MqttServerEventDispatcher _eventDispatcher = new MqttServerEventDispatcher();
private readonly ICollection<IMqttServerAdapter> _adapters;
private readonly IMqttNetChildLogger _logger;

@@ -21,10 +22,16 @@ namespace MQTTnet.Server
public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetChildLogger logger)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
_adapters = adapters.ToList();

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttServer));

_adapters = adapters.ToList();
_eventDispatcher.ClientConnected += (s, e) => ClientConnected?.Invoke(s, e);
_eventDispatcher.ClientDisconnected += (s, e) => ClientDisconnected?.Invoke(s, e);
_eventDispatcher.ClientSubscribedTopic += (s, e) => ClientSubscribedTopic?.Invoke(s, e);
_eventDispatcher.ClientUnsubscribedTopic += (s, e) => ClientUnsubscribedTopic?.Invoke(s, e);
_eventDispatcher.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
}

public event EventHandler Started;
@@ -92,7 +99,7 @@ namespace MQTTnet.Server
_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);

_clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _cancellationTokenSource.Token, _logger);
_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
_clientSessionsManager.Start();

foreach (var adapter in _adapters)
@@ -144,33 +151,6 @@ namespace MQTTnet.Server
return _retainedMessagesManager?.ClearMessagesAsync();
}

internal void OnClientConnected(string clientId)
{
_logger.Info("Client '{0}': Connected.", clientId);
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId));
}

internal void OnClientDisconnected(string clientId, bool wasCleanDisconnect)
{
_logger.Info("Client '{0}': Disconnected (clean={1}).", clientId, wasCleanDisconnect);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect));
}

internal void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
{
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
}

internal void OnClientUnsubscribedTopic(string clientId, string topicFilter)
{
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
}

internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));
}

private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
eventArgs.SessionTask = _clientSessionsManager.StartSession(eventArgs.Client);


+ 42
- 0
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -0,0 +1,42 @@
using System;

namespace MQTTnet.Server
{
public class MqttServerEventDispatcher
{
public event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;

public event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
{
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
}

public void OnClientUnsubscribedTopic(string clientId, string topicFilter)
{
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
}

public void OnClientDisconnected(string clientId, bool wasCleanDisconnect)
{
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect));
}

public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
}

public void OnClientConnected(string clientId)
{
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId));
}
}
}

+ 3
- 2
Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj View File

@@ -3,8 +3,9 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<DebugType>Full</DebugType>
<TargetFramework>net461</TargetFramework>
<LangVersion>7.2</LangVersion>
<TargetFramework Condition=" '$(OS)' == 'Windows_NT' ">net461</TargetFramework>
<TargetFramework Condition=" '$(OS)' != 'Windows_NT' ">netcoreapp2.1</TargetFramework>
<LangVersion>7.2</LangVersion>
</PropertyGroup>

<ItemGroup>


+ 95
- 3
Tests/MQTTnet.Core.Tests/MqttClientTests.cs View File

@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -8,6 +10,7 @@ using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Server;

namespace MQTTnet.Core.Tests
{
@@ -15,7 +18,7 @@ namespace MQTTnet.Core.Tests
public class MqttClientTests
{
[TestMethod]
public async Task ClientDisconnectException()
public async Task Client_Disconnect_Exception()
{
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
@@ -39,9 +42,95 @@ namespace MQTTnet.Core.Tests
Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException));
}

[TestMethod]
public async Task Client_Publish()
{
var server = new MqttFactory().CreateMqttServer();
try
{
var receivedMessages = new List<MqttApplicationMessage>();

await server.StartAsync(new MqttServerOptions());

var client1 = new MqttFactory().CreateMqttClient();
client1.ApplicationMessageReceived += (_, e) =>
{
lock (receivedMessages)
{
receivedMessages.Add(e.ApplicationMessage);
}
};

await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build());
await client1.SubscribeAsync("a");

var client2 = new MqttFactory().CreateMqttClient();
await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build());
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build();
await client2.PublishAsync(message);

await Task.Delay(500);

Assert.AreEqual(1, receivedMessages.Count);
Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above!
}
finally
{
await server.StopAsync();
}
}

[TestMethod]
public async Task Publish_Special_Content()
{
var factory = new MqttFactory();
var server = factory.CreateMqttServer();
var serverOptions = new MqttServerOptionsBuilder().Build();

var receivedMessages = new List<MqttApplicationMessage>();

var client = factory.CreateMqttClient();

try
{
await server.StartAsync(serverOptions);

client.Connected += async (s, e) =>
{
await client.SubscribeAsync("RCU/P1/H0001/R0003");

var msg = new MqttApplicationMessageBuilder()
.WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|")
.WithTopic("RCU/P1/H0001/R0003");

await client.PublishAsync(msg.Build());
};

client.ApplicationMessageReceived += (s, e) =>
{
lock (receivedMessages)
{
receivedMessages.Add(e.ApplicationMessage);
}
};

await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());

await Task.Delay(500);

Assert.AreEqual(1, receivedMessages.Count);
Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString());
}
finally
{
await server.StopAsync();
}
}

#if DEBUG
[TestMethod]
public async Task ClientCleanupOnAuthentificationFails()
public async Task Client_Cleanup_On_Authentification_Fails()
{
var channel = new TestMqttCommunicationAdapter();
var channel2 = new TestMqttCommunicationAdapter();
@@ -50,7 +139,10 @@ namespace MQTTnet.Core.Tests

Task.Run(async () => {
var connect = await channel2.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None);
await channel2.SendPacketAsync(new MqttConnAckPacket { ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized }, CancellationToken.None);
await channel2.SendPacketAsync(new MqttConnAckPacket
{
ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized
}, CancellationToken.None);
});
var fake = new TestMqttCommunicationAdapterFactory(channel);


+ 114
- 21
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -4,6 +4,7 @@ using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -227,6 +228,39 @@ namespace MQTTnet.Core.Tests

Assert.AreEqual(2000, receivedMessagesCount);
}
[TestMethod]
public async Task MqttServer_SessionTakeover()
{
var server = new MqttFactory().CreateMqttServer();
try
{
await server.StartAsync(new MqttServerOptions());

var client1 = new MqttFactory().CreateMqttClient();
var client2 = new MqttFactory().CreateMqttClient();

var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
.WithCleanSession(false)
.WithClientId("a").Build();

await client1.ConnectAsync(options);

await Task.Delay(500);

await client2.ConnectAsync(options);

await Task.Delay(500);

Assert.IsFalse(client1.IsConnected);
Assert.IsTrue(client2.IsConnected);
}
finally
{
await server.StopAsync();
}
}

private static async Task Publish(IMqttClient c1, MqttApplicationMessage message)
{
@@ -267,11 +301,6 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public async Task MqttServer_HandleCleanDisconnect()
{
MqttNetGlobalLogger.LogMessagePublished += (_, e) =>
{
System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}");
};

var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());

@@ -304,6 +333,51 @@ namespace MQTTnet.Core.Tests
Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled);
}

[TestMethod]
public async Task MqttServer_Client_Disconnect_Without_Errors()
{
var errors = 0;

MqttNetGlobalLogger.LogMessagePublished += (_, e) =>
{
System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}");

if (e.TraceMessage.Level == MqttNetLogLevel.Error)
{
errors++;
}
};

bool clientWasConnected;

var server = new MqttFactory().CreateMqttServer();
try
{
var options = new MqttServerOptionsBuilder().Build();
await server.StartAsync(options);

var client = new MqttFactory().CreateMqttClient();
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
.Build();

await client.ConnectAsync(clientOptions);

clientWasConnected = true;

await client.DisconnectAsync();

await Task.Delay(500);
}
finally
{
await server.StopAsync();
}
Assert.IsTrue(clientWasConnected);
Assert.AreEqual(0, errors);
}

[TestMethod]
public async Task MqttServer_LotsOfRetainedMessages()
{
@@ -389,6 +463,8 @@ namespace MQTTnet.Core.Tests
}

await c2.DisconnectAsync();

await s.StopAsync();
}

[TestMethod]
@@ -427,7 +503,7 @@ namespace MQTTnet.Core.Tests
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());

var receivedMessagesCount = 0;
var receivedMessages = new List<MqttApplicationMessage>();
try
{
await s.StartAsync(new MqttServerOptions());
@@ -437,7 +513,14 @@ namespace MQTTnet.Core.Tests
await c1.DisconnectAsync();

var c2 = await serverAdapter.ConnectTestClient("c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
c2.ApplicationMessageReceived += (_, e) =>
{
lock (receivedMessages)
{
receivedMessages.Add(e.ApplicationMessage);
}
};

await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());

await Task.Delay(500);
@@ -447,7 +530,8 @@ namespace MQTTnet.Core.Tests
await s.StopAsync();
}

Assert.AreEqual(1, receivedMessagesCount);
Assert.AreEqual(1, receivedMessages.Count);
Assert.IsTrue(receivedMessages.First().Retain);
}

[TestMethod]
@@ -655,26 +739,29 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
{
var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
var s = new MqttFactory().CreateMqttServer();

var connectedClient = false;
var connecteCalledBeforeConnectedClients = false;
var events = new List<string>();

s.ClientConnected += (_, __) =>
{
connecteCalledBeforeConnectedClients |= connectedClient;
connectedClient = true;
lock (events)
{
events.Add("c");
}
};

s.ClientDisconnected += (_, __) =>
{
connectedClient = false;
lock (events)
{
events.Add("d");
}
};

var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
.WithClientId(Guid.NewGuid().ToString())
.WithClientId("same_id")
.Build();

await s.StartAsync(new MqttServerOptions());
@@ -684,20 +771,24 @@ namespace MQTTnet.Core.Tests

await c1.ConnectAsync(clientOptions);

await Task.Delay(100);
await Task.Delay(250);

await c2.ConnectAsync(clientOptions);

await Task.Delay(100);
await Task.Delay(250);

await c1.DisconnectAsync();

await Task.Delay(250);

await c2.DisconnectAsync();

await s.StopAsync();
await Task.Delay(250);

await Task.Delay(100);
await s.StopAsync();

Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called");
var flow = string.Join(string.Empty, events);
Assert.AreEqual("cdcd", flow);
}


@@ -725,6 +816,8 @@ namespace MQTTnet.Core.Tests
await server.StartAsync(new MqttServerOptions());
var client3 = new MqttFactory().CreateMqttClient();
await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());

await server.StopAsync();
}

private class TestStorage : IMqttServerStorage


+ 5
- 7
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs View File

@@ -1,6 +1,4 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
@@ -13,7 +11,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -28,7 +26,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
@@ -43,7 +41,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
@@ -59,7 +57,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -72,7 +70,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());


+ 2
- 1
Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj View File

@@ -3,7 +3,8 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<DebugType>Full</DebugType>
<TargetFrameworks>netcoreapp2.1;net452;net461</TargetFrameworks>
<TargetFrameworks>netcoreapp2.1</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461</TargetFrameworks>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netcoreapp2.1|AnyCPU'">


+ 1
- 7
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -61,13 +61,7 @@ namespace MQTTnet.TestApp.NetCore

public class RandomPassword : IMqttClientCredentials
{
public string Password
{
get
{
return Guid.NewGuid().ToString(); // The random password.
}
}
public string Password => Guid.NewGuid().ToString();

public string Username => "the_static_user";
}


+ 1
- 1
appveyor.yml View File

@@ -3,7 +3,7 @@ image: Visual Studio 2017
configuration: Release
before_build:
- cmd: >-
msbuild /t:restore
msbuild /t:restore MQTTnet.sln

cd Tests/MQTTnet.TestApp.AspNetCore2/



Loading…
Cancel
Save