diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec
index 20489c7..e57035e 100644
--- a/Build/MQTTnet.AspNetCore.nuspec
+++ b/Build/MQTTnet.AspNetCore.nuspec
@@ -14,7 +14,7 @@
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
-
+
diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec
index 572f73d..a34c8d6 100644
--- a/Build/MQTTnet.Extensions.ManagedClient.nuspec
+++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec
@@ -14,7 +14,7 @@
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
-
+
diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec
index 29cec2b..bef3322 100644
--- a/Build/MQTTnet.Extensions.Rpc.nuspec
+++ b/Build/MQTTnet.Extensions.Rpc.nuspec
@@ -14,7 +14,7 @@
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
-
+
diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index e3a8d29..5814a73 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -12,12 +12,17 @@
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
* [Core] Updated nuget packages due to security issues.
* [Client] Fixed wrong behavior of publish method when client is disconnecting (thanks to @PaulFake).
+* [Client] Added readonly property for accessing options.
* [ManagedClient] Added max pending messages count option.
* [ManagedClient] Add pending messages overflow strategy option.
+* [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake).
+* [ManagedClient] Added readonly property for accessing options.
* [Server] Added new method which exposes all retained messages.
* [Server] Removed (wrong) setter from the server options interface.
-* [Server] fixed cpu spike in case a client disconnectes (issue 421).
-* [Server] fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport.
+* [Server] Fixed cpu spike in case a client disconnects (issue 421).
+* [Server] Fixed concurrent sends with Aspnetcore.Connections.Abstractions based transport.
+* [Server] Fixed wrong retain flag when distributing application messages (thanks to @trev0115).
+* [Server] Fixed issue which closes a connection when reconnecting with the same client ID (thanks to @fogzot).
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/MQTTnet.noUWP.sln b/MQTTnet.noUWP.sln
new file mode 100644
index 0000000..8d9130e
--- /dev/null
+++ b/MQTTnet.noUWP.sln
@@ -0,0 +1,222 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio 15
+VisualStudioVersion = 15.0.27004.2010
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{9248C2E1-B9D6-40BF-81EC-86004D7765B4}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Source", "Source", "{32A630A7-2598-41D7-B625-204CD906F5FB}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet", "Source\MQTTnet\MQTTnet.csproj", "{3587E506-55A2-4EB3-99C7-DC01E42D25D2}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1-BC3A-420A-BE9C-FA2401431CF9}"
+ ProjectSection(SolutionItems) = preProject
+ Build\build.ps1 = Build\build.ps1
+ Build\MQTTnet.AspNetCore.nuspec = Build\MQTTnet.AspNetCore.nuspec
+ Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec
+ Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec
+ Build\MQTTnet.nuspec = Build\MQTTnet.nuspec
+ Build\upload.ps1 = Build\upload.ps1
+ EndProjectSection
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}"
+ ProjectSection(SolutionItems) = preProject
+ .bettercodehub.yml = .bettercodehub.yml
+ appveyor.yml = appveyor.yml
+ LICENSE = LICENSE
+ README.md = README.md
+ EndProjectSection
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore", "Source\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj", "{F10C4060-F7EE-4A83-919F-FF723E72F94A}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{12816BCC-AF9E-44A9-9AE5-C246AF2A0587}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.Rpc", "Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj", "{C444E9C8-95FA-430E-9126-274129DE16CD}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Benchmarks", "Tests\MQTTnet.Benchmarks\MQTTnet.Benchmarks.csproj", "{998D04DD-7CB0-45F5-A393-E2495C16399E}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.ManagedClient", "Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj", "{C400533A-8EBA-4F0B-BF4D-295C3708604B}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", "Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Debug|ARM = Debug|ARM
+ Debug|x64 = Debug|x64
+ Debug|x86 = Debug|x86
+ Release|Any CPU = Release|Any CPU
+ Release|ARM = Release|ARM
+ Release|x64 = Release|x64
+ Release|x86 = Release|x86
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.Build.0 = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x64.Build.0 = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|x86.Build.0 = Debug|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.ActiveCfg = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|ARM.Build.0 = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.ActiveCfg = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|ARM.Build.0 = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x64.Build.0 = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Debug|x86.Build.0 = Debug|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.ActiveCfg = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|ARM.Build.0 = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.ActiveCfg = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x64.Build.0 = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.ActiveCfg = Release|Any CPU
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB}.Release|x86.Build.0 = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|ARM.Build.0 = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x64.Build.0 = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Debug|x86.Build.0 = Debug|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.ActiveCfg = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|ARM.Build.0 = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.ActiveCfg = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x64.Build.0 = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.ActiveCfg = Release|Any CPU
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}.Release|x86.Build.0 = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|ARM.Build.0 = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x64.Build.0 = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Debug|x86.Build.0 = Debug|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.ActiveCfg = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|ARM.Build.0 = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.ActiveCfg = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x64.Build.0 = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.ActiveCfg = Release|Any CPU
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A}.Release|x86.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|ARM.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x64.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Debug|x86.Build.0 = Debug|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|ARM.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x64.Build.0 = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.ActiveCfg = Release|Any CPU
+ {C444E9C8-95FA-430E-9126-274129DE16CD}.Release|x86.Build.0 = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|ARM.Build.0 = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x64.Build.0 = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Debug|x86.Build.0 = Debug|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.ActiveCfg = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|ARM.Build.0 = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.ActiveCfg = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x64.Build.0 = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.ActiveCfg = Release|Any CPU
+ {998D04DD-7CB0-45F5-A393-E2495C16399E}.Release|x86.Build.0 = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|ARM.Build.0 = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x64.Build.0 = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Debug|x86.Build.0 = Debug|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.ActiveCfg = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|ARM.Build.0 = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.ActiveCfg = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x64.Build.0 = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.ActiveCfg = Release|Any CPU
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B}.Release|x86.Build.0 = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|ARM.Build.0 = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x64.Build.0 = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Debug|x86.Build.0 = Debug|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.ActiveCfg = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|ARM.Build.0 = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.ActiveCfg = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.Build.0 = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.ActiveCfg = Release|Any CPU
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
+ {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
+ {3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
+ {C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
+ {F10C4060-F7EE-4A83-919F-FF723E72F94A} = {32A630A7-2598-41D7-B625-204CD906F5FB}
+ {C444E9C8-95FA-430E-9126-274129DE16CD} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
+ {998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
+ {C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
+ {61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}
+ EndGlobalSection
+EndGlobal
diff --git a/README.md b/README.md
index a187cc5..436628a 100644
--- a/README.md
+++ b/README.md
@@ -86,12 +86,13 @@ This project also listed at Open Collective (https://opencollective.com/mqttnet)
This library is used in the following projects:
+* Azure Functions MQTT Bindings, [CaseOnline.Azure.WebJobs.Extensions.Mqtt](https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/)
* HA4IoT (Open Source Home Automation system for .NET, )
* MQTT Client Rx (Wrapper for Reactive Extensions, )
* MQTT Tester (MQTT client test app for [Android](https://play.google.com/store/apps/details?id=com.liveowl.mqtttester) and [iOS](https://itunes.apple.com/us/app/mqtt-tester/id1278621826?mt=8))
* Wirehome.Core (Open Source Home Automation system for .NET Core, )
-If you use this library and want to see your project here please let me know.
+If you use this library and want to see your project here please create a pull request.
## MIT License
diff --git a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
index d003584..b3835e3 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
@@ -10,6 +10,7 @@ namespace MQTTnet.Extensions.ManagedClient
bool IsStarted { get; }
bool IsConnected { get; }
int PendingApplicationMessagesCount { get; }
+ IManagedMqttClientOptions Options { get; }
event EventHandler Connected;
event EventHandler Disconnected;
diff --git a/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj b/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj
index c8405a2..09dce3f 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj
+++ b/Source/MQTTnet.Extensions.ManagedClient/MQTTnet.Extensions.ManagedClient.csproj
@@ -1,7 +1,8 @@
- netstandard1.3;netstandard2.0;net452;net461
+ netstandard1.3;netstandard2.0
+ $(TargetFrameworks);net452;net461
$(TargetFrameworks);uap10.0
diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
index 271e6b9..866a75d 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
@@ -27,8 +27,7 @@ namespace MQTTnet.Extensions.ManagedClient
private CancellationTokenSource _publishingCancellationToken;
private ManagedMqttClientStorageManager _storageManager;
- private IManagedMqttClientOptions _options;
-
+
private bool _subscriptionsNotPushed;
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
@@ -47,6 +46,7 @@ namespace MQTTnet.Extensions.ManagedClient
public bool IsConnected => _mqttClient.IsConnected;
public bool IsStarted => _connectionCancellationToken != null;
public int PendingApplicationMessagesCount => _messageQueue.Count;
+ public IManagedMqttClientOptions Options { get; private set; }
public event EventHandler Connected;
public event EventHandler Disconnected;
@@ -70,11 +70,11 @@ namespace MQTTnet.Extensions.ManagedClient
if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
- _options = options;
+ Options = options;
- if (_options.Storage != null)
+ if (Options.Storage != null)
{
- _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
+ _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
foreach (var message in messages)
@@ -116,16 +116,16 @@ namespace MQTTnet.Extensions.ManagedClient
ManagedMqttApplicationMessage removedMessage = null;
lock (_messageQueue)
{
- if (_messageQueue.Count >= _options.MaxPendingMessages)
+ if (_messageQueue.Count >= Options.MaxPendingMessages)
{
- if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
+ if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
return;
}
- if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
+ if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
@@ -219,7 +219,7 @@ namespace MQTTnet.Extensions.ManagedClient
if (connectionState == ReconnectionResult.NotConnected)
{
StopPublishing();
- await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
+ await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
return;
}
@@ -232,7 +232,7 @@ namespace MQTTnet.Extensions.ManagedClient
if (connectionState == ReconnectionResult.StillConnected)
{
- await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
+ await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -254,7 +254,15 @@ namespace MQTTnet.Extensions.ManagedClient
{
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
{
- var message = _messageQueue.Dequeue();
+ //Peek at the message without dequeueing in order to prevent the
+ //possibility of the queue growing beyond the configured cap.
+ //Previously, messages could be re-enqueued if there was an
+ //exception, and this re-enqueueing did not honor the cap.
+ //Furthermore, because re-enqueueing would shuffle the order
+ //of the messages, the DropOldestQueuedMessage strategy would
+ //be unable to know which message is actually the oldest and would
+ //instead drop the first item in the queue.
+ var message = _messageQueue.PeekAndWait();
if (message == null)
{
continue;
@@ -284,6 +292,16 @@ namespace MQTTnet.Extensions.ManagedClient
try
{
_mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
+ lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
+ {
+ //While publishing this message, this.PublishAsync could have booted this
+ //message off the queue to make room for another (when using a cap
+ //with the DropOldestQueuedMessage strategy). If the first item
+ //in the queue is equal to this message, then it's safe to remove
+ //it from the queue. If not, that means this.PublishAsync has already
+ //removed it, in which case we don't want to do anything.
+ _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
+ }
_storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
}
catch (MqttCommunicationException exception)
@@ -292,9 +310,19 @@ namespace MQTTnet.Extensions.ManagedClient
_logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
- if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
+ if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
- _messageQueue.Enqueue(message);
+ //If QoS 0, we don't want this message to stay on the queue.
+ //If QoS 1 or 2, it's possible that, when using a cap, this message
+ //has been booted off the queue by this.PublishAsync, in which case this
+ //thread will not continue to try to publish it. While this does
+ //contradict the expected behavior of QoS 1 and 2, that's also true
+ //for the usage of a message queue cap, so it's still consistent
+ //with prior behavior in that way.
+ lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
+ {
+ _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
+ }
}
}
catch (Exception exception)
@@ -360,7 +388,7 @@ namespace MQTTnet.Extensions.ManagedClient
try
{
- await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
+ await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
return ReconnectionResult.Reconnected;
}
catch (Exception exception)
diff --git a/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj b/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
index c8405a2..09dce3f 100644
--- a/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
+++ b/Source/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj
@@ -1,7 +1,8 @@
- netstandard1.3;netstandard2.0;net452;net461
+ netstandard1.3;netstandard2.0
+ $(TargetFrameworks);net452;net461
$(TargetFrameworks);uap10.0
diff --git a/Source/MQTTnet/Client/IMqttClient.cs b/Source/MQTTnet/Client/IMqttClient.cs
index 40d1a48..b60a31a 100644
--- a/Source/MQTTnet/Client/IMqttClient.cs
+++ b/Source/MQTTnet/Client/IMqttClient.cs
@@ -7,6 +7,7 @@ namespace MQTTnet.Client
public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable
{
bool IsConnected { get; }
+ IMqttClientOptions Options { get; }
event EventHandler Connected;
event EventHandler Disconnected;
diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs
index f525d2d..9b0fcc5 100644
--- a/Source/MQTTnet/Client/MqttClient.cs
+++ b/Source/MQTTnet/Client/MqttClient.cs
@@ -25,7 +25,6 @@ namespace MQTTnet.Client
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetChildLogger _logger;
- private IMqttClientOptions _options;
private CancellationTokenSource _cancellationTokenSource;
internal Task _packetReceiverTask;
internal Task _keepAliveMessageSenderTask;
@@ -46,6 +45,7 @@ namespace MQTTnet.Client
public event EventHandler ApplicationMessageReceived;
public bool IsConnected { get; private set; }
+ public IMqttClientOptions Options { get; private set; }
public async Task ConnectAsync(IMqttClientOptions options)
{
@@ -56,16 +56,17 @@ namespace MQTTnet.Client
try
{
- _options = options;
+ Options = options;
+
_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();
_cancellationTokenSource = new CancellationTokenSource();
_disconnectGate = 0;
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);
-
- _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
- await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
+
+ _logger.Verbose($"Trying to connect with server ({Options.ChannelOptions}).");
+ await _adapter.ConnectAsync(Options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");
StartReceivingPackets(_cancellationTokenSource.Token);
@@ -75,7 +76,7 @@ namespace MQTTnet.Client
_sendTracker.Restart();
- if (_options.KeepAlivePeriod != TimeSpan.Zero)
+ if (Options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
}
@@ -141,7 +142,7 @@ namespace MQTTnet.Client
return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
}
- public async Task UnsubscribeAsync(IEnumerable topicFilters)
+ public Task UnsubscribeAsync(IEnumerable topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
@@ -153,7 +154,7 @@ namespace MQTTnet.Client
TopicFilters = topicFilters.ToList()
};
- await SendAndReceiveAsync(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
+ return SendAndReceiveAsync(unsubscribePacket, _cancellationTokenSource.Token);
}
public Task PublishAsync(MqttApplicationMessage applicationMessage)
@@ -201,11 +202,11 @@ namespace MQTTnet.Client
{
var connectPacket = new MqttConnectPacket
{
- ClientId = _options.ClientId,
- Username = _options.Credentials?.Username,
- Password = _options.Credentials?.Password,
- CleanSession = _options.CleanSession,
- KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
+ ClientId = Options.ClientId,
+ Username = Options.Credentials?.Username,
+ Password = Options.Credentials?.Password,
+ CleanSession = Options.CleanSession,
+ KeepAlivePeriod = (ushort)Options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};
@@ -241,14 +242,14 @@ namespace MQTTnet.Client
try
{
- await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
- await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
-
if (_adapter != null)
{
- await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
+ await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
+ await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
+ await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
+
_logger.Verbose("Disconnected from adapter.");
}
catch (Exception adapterException)
@@ -310,7 +311,7 @@ namespace MQTTnet.Client
try
{
await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false);
- var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
+ var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
return (TResponsePacket)respone;
}
@@ -327,16 +328,16 @@ namespace MQTTnet.Client
private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
- _logger.Verbose("Start sending keep alive packets.");
-
try
{
+ _logger.Verbose("Start sending keep alive packets.");
+
while (!cancellationToken.IsCancellationRequested)
{
- var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
- if (_options.KeepAliveSendInterval.HasValue)
+ var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75);
+ if (Options.KeepAliveSendInterval.HasValue)
{
- keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
+ keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}
var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
@@ -381,10 +382,10 @@ namespace MQTTnet.Client
private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
- _logger.Verbose("Start receiving packets.");
-
try
{
+ _logger.Verbose("Start receiving packets.");
+
while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken)
@@ -507,7 +508,7 @@ namespace MQTTnet.Client
() => ReceivePacketsAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
- TaskScheduler.Default);
+ TaskScheduler.Default).Unwrap();
}
private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
@@ -516,7 +517,7 @@ namespace MQTTnet.Client
() => SendKeepAliveMessagesAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
- TaskScheduler.Default);
+ TaskScheduler.Default).Unwrap();
}
private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
@@ -525,7 +526,7 @@ namespace MQTTnet.Client
{
// TODO: Move conversion to formatter.
var applicationMessage = publishPacket.ToApplicationMessage();
- ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
+ ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));
}
catch (Exception exception)
{
diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
index 3928328..58eece6 100644
--- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
@@ -2,7 +2,6 @@
using System;
using System.Net.Security;
using System.Net.Sockets;
-using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using System.IO;
@@ -17,7 +16,7 @@ namespace MQTTnet.Implementations
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
-
+
private Socket _socket;
private Stream _stream;
@@ -87,8 +86,9 @@ namespace MQTTnet.Implementations
public void Dispose()
{
- Cleanup(ref _stream, (s) => s.Dispose());
- Cleanup(ref _socket, (s) => {
+ Cleanup(ref _stream, s => s.Dispose());
+ Cleanup(ref _socket, s =>
+ {
if (s.Connected)
{
s.Shutdown(SocketShutdown.Both);
@@ -102,7 +102,7 @@ namespace MQTTnet.Implementations
// Try the instance callback.
if (_options.TlsOptions.CertificateValidationCallback != null)
{
- return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors,_clientOptions);
+ return _options.TlsOptions.CertificateValidationCallback(x509Certificate, chain, sslPolicyErrors, _clientOptions);
}
// Try static callback.
diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs
index d6c28f4..485f644 100644
--- a/Source/MQTTnet/Internal/BlockingQueue.cs
+++ b/Source/MQTTnet/Internal/BlockingQueue.cs
@@ -55,6 +55,40 @@ namespace MQTTnet.Internal
_gate.WaitOne();
}
}
+
+ public TItem PeekAndWait()
+ {
+ while (true)
+ {
+ lock (_syncRoot)
+ {
+ if (_items.Count > 0)
+ {
+ return _items.First.Value;
+ }
+
+ if (_items.Count == 0)
+ {
+ _gate.Reset();
+ }
+ }
+
+ _gate.WaitOne();
+ }
+ }
+
+ public void RemoveFirst(Predicate match)
+ {
+ if (match == null) throw new ArgumentNullException(nameof(match));
+
+ lock (_syncRoot)
+ {
+ if (_items.Count > 0 && match(_items.First.Value))
+ {
+ _items.RemoveFirst();
+ }
+ }
+ }
public TItem RemoveFirst()
{
diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
index f1a209b..fbe1f87 100644
--- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
+++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
@@ -36,8 +36,8 @@ namespace MQTTnet.Server
{
return;
}
-
- Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken);
+
+ Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public void Pause()
@@ -50,6 +50,12 @@ namespace MQTTnet.Server
_isPaused = false;
}
+ public void Reset()
+ {
+ _lastPacketReceivedTracker.Restart();
+ _lastNonKeepAlivePacketReceivedTracker.Restart();
+ }
+
public void PacketReceived(MqttBasePacket packet)
{
_lastPacketReceivedTracker.Restart();
diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs
index cc949a4..c915dbc 100644
--- a/Source/MQTTnet/Server/MqttClientSession.cs
+++ b/Source/MQTTnet/Server/MqttClientSession.cs
@@ -9,6 +9,7 @@ using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
+using MQTTnet.Serializer;
namespace MQTTnet.Server
{
@@ -17,6 +18,7 @@ namespace MQTTnet.Server
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
+ private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientPendingPacketsQueue _pendingPacketsQueue;
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
@@ -28,29 +30,33 @@ namespace MQTTnet.Server
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;
private bool _wasCleanDisconnect;
- private IMqttChannelAdapter _adapter;
private Task _workerTask;
private IDisposable _cleanupHandle;
+ private string _adapterEndpoint;
+ private MqttProtocolVersion? _adapterProtocolVersion;
+
public MqttClientSession(
string clientId,
IMqttServerOptions options,
MqttClientSessionsManager sessionsManager,
MqttRetainedMessagesManager retainedMessagesManager,
+ MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options));
- _sessionsManager = sessionsManager;
+ _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
+ _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
ClientId = clientId;
_logger = logger.CreateChildLogger(nameof(MqttClientSession));
_keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger);
- _subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server);
+ _subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, eventDispatcher);
_pendingPacketsQueue = new MqttClientPendingPacketsQueue(_options, this, _logger);
}
@@ -59,9 +65,9 @@ namespace MQTTnet.Server
public void FillStatus(MqttClientSessionStatus status)
{
status.ClientId = ClientId;
- status.IsConnected = _adapter != null;
- status.Endpoint = _adapter?.Endpoint;
- status.ProtocolVersion = _adapter?.PacketFormatterAdapter?.ProtocolVersion;
+ status.IsConnected = _cancellationTokenSource != null;
+ status.Endpoint = _adapterEndpoint;
+ status.ProtocolVersion = _adapterProtocolVersion;
status.PendingApplicationMessagesCount = _pendingPacketsQueue.Count;
status.LastPacketReceived = _keepAliveMonitor.LastPacketReceived;
status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived;
@@ -73,24 +79,154 @@ namespace MQTTnet.Server
return _workerTask;
}
- private async Task RunInternalAsync(MqttApplicationMessage willMessage, int keepAlivePeriod, IMqttChannelAdapter adapter)
+ public void Stop(MqttClientDisconnectType type)
+ {
+ Stop(type, false);
+ }
+
+ public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ {
+ if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
+
+ var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(publishPacket.Topic, publishPacket.QualityOfServiceLevel);
+ if (!checkSubscriptionsResult.IsSubscribed)
+ {
+ return;
+ }
+
+ publishPacket = new MqttPublishPacket
+ {
+ Topic = publishPacket.Topic,
+ Payload = publishPacket.Payload,
+ QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel,
+ Retain = publishPacket.Retain,
+ Dup = false
+ };
+
+ if (publishPacket.QualityOfServiceLevel > 0)
+ {
+ publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
+ }
+
+ if (_options.ClientMessageQueueInterceptor != null)
+ {
+ var context = new MqttClientMessageQueueInterceptorContext(
+ senderClientSession?.ClientId,
+ ClientId,
+ publishPacket.ToApplicationMessage());
+
+ _options.ClientMessageQueueInterceptor?.Invoke(context);
+
+ if (!context.AcceptEnqueue || context.ApplicationMessage == null)
+ {
+ return;
+ }
+
+ publishPacket.Topic = context.ApplicationMessage.Topic;
+ publishPacket.Payload = context.ApplicationMessage.Payload;
+ publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel;
+ }
+
+ _pendingPacketsQueue.Enqueue(publishPacket);
+ }
+
+ public Task SubscribeAsync(IList topicFilters)
+ {
+ if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
+
+ _subscriptionsManager.Subscribe(new MqttSubscribePacket
+ {
+ TopicFilters = topicFilters
+ });
+
+ EnqueueSubscribedRetainedMessages(topicFilters);
+ return Task.FromResult(0);
+ }
+
+ public Task UnsubscribeAsync(IList topicFilters)
+ {
+ if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
+
+ _subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket
+ {
+ TopicFilters = topicFilters
+ });
+
+ return Task.FromResult(0);
+ }
+
+ public void ClearPendingApplicationMessages()
+ {
+ _pendingPacketsQueue.Clear();
+ }
+
+ public void Dispose()
+ {
+ _pendingPacketsQueue?.Dispose();
+
+ _cancellationTokenSource?.Cancel ();
+ _cancellationTokenSource?.Dispose();
+ _cancellationTokenSource = null;
+ }
+
+ private void Stop(MqttClientDisconnectType type, bool isInsideSession)
+ {
+ try
+ {
+ var cts = _cancellationTokenSource;
+ if (cts == null || cts.IsCancellationRequested)
+ {
+ return;
+ }
+
+ _cancellationTokenSource?.Cancel(false);
+
+ _wasCleanDisconnect = type == MqttClientDisconnectType.Clean;
+
+ if (_willMessage != null && !_wasCleanDisconnect)
+ {
+ _sessionsManager.EnqueueApplicationMessage(this, _willMessage.ToPublishPacket());
+ }
+
+ _willMessage = null;
+
+ if (!isInsideSession)
+ {
+ _workerTask?.GetAwaiter().GetResult();
+ }
+ }
+ finally
+ {
+ _logger.Info("Client '{0}': Disconnected (clean={1}).", ClientId, _wasCleanDisconnect);
+ _eventDispatcher.OnClientDisconnected(ClientId, _wasCleanDisconnect);
+ }
+ }
+
+ private async Task RunInternalAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
try
{
- _adapter = adapter;
+ if (_cancellationTokenSource != null)
+ {
+ Stop(MqttClientDisconnectType.Clean, true);
+ }
adapter.ReadingPacketStarted += OnAdapterReadingPacketStarted;
adapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted;
_cancellationTokenSource = new CancellationTokenSource();
- //woraround for https://github.com/dotnet/corefx/issues/24430
+ //workaround for https://github.com/dotnet/corefx/issues/24430
#pragma warning disable 4014
- _cleanupHandle = _cancellationTokenSource.Token.Register(() => CleanupAsync());
+ _cleanupHandle = _cancellationTokenSource.Token.Register(async () =>
+ {
+ await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false);
+ TryDisposeAdapter(adapter);
+ });
#pragma warning restore 4014
- //endworkaround
+ //end workaround
_wasCleanDisconnect = false;
_willMessage = willMessage;
@@ -98,6 +234,9 @@ namespace MQTTnet.Server
_pendingPacketsQueue.Start(adapter, _cancellationTokenSource.Token);
_keepAliveMonitor.Start(keepAlivePeriod, _cancellationTokenSource.Token);
+ _adapterEndpoint = adapter.Endpoint;
+ _adapterProtocolVersion = adapter.PacketSerializer.ProtocolVersion;
+
while (!_cancellationTokenSource.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationTokenSource.Token).ConfigureAwait(false);
@@ -133,34 +272,35 @@ namespace MQTTnet.Server
}
finally
{
- await CleanupAsync().ConfigureAwait(false);
+ _adapterEndpoint = null;
+ _adapterProtocolVersion = null;
+
+ // Uncomment as soon as the workaround above is no longer needed.
+ //await TryDisconnectAdapterAsync(adapter).ConfigureAwait(false);
+ //TryDisposeAdapter(adapter);
_cleanupHandle?.Dispose();
_cleanupHandle = null;
- _adapter = null;
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}
}
- private async Task CleanupAsync()
+ private void TryDisposeAdapter(IMqttChannelAdapter adapter)
{
- var adapter = _adapter;
- try
+ if (adapter == null)
{
- if (adapter == null)
- {
- return;
- }
-
- _adapter = null;
+ return;
+ }
+ try
+ {
adapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted;
adapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted;
- await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
+ adapter.Dispose();
}
catch (Exception exception)
{
@@ -231,65 +371,21 @@ namespace MQTTnet.Server
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}
- if (_options.ClientMessageQueueInterceptor != null)
+ private async Task TryDisconnectAdapterAsync(IMqttChannelAdapter adapter)
+ {
+ if (adapter == null)
{
- var context = new MqttClientMessageQueueInterceptorContext(
- senderClientSession?.ClientId,
- ClientId,
- publishPacket.ToApplicationMessage());
-
- _options.ClientMessageQueueInterceptor?.Invoke(context);
-
- if (!context.AcceptEnqueue || context.ApplicationMessage == null)
- {
- return;
- }
-
- publishPacket.Topic = context.ApplicationMessage.Topic;
- publishPacket.Payload = context.ApplicationMessage.Payload;
- publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel;
+ return;
}
- _pendingPacketsQueue.Enqueue(publishPacket);
- }
-
- public Task SubscribeAsync(IList topicFilters)
- {
- if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
-
- _subscriptionsManager.Subscribe(new MqttSubscribePacket
+ try
{
- TopicFilters = topicFilters
- });
-
- EnqueueSubscribedRetainedMessages(topicFilters);
- return Task.FromResult(0);
- }
-
- public Task UnsubscribeAsync(IList topicFilters)
- {
- if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
-
- _subscriptionsManager.Unsubscribe(new MqttUnsubscribePacket
+ await adapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
+ }
+ catch (Exception exception)
{
- TopicFilters = topicFilters
- });
-
- return Task.FromResult(0);
- }
-
- public void ClearPendingApplicationMessages()
- {
- _pendingPacketsQueue.Clear();
- }
-
- public void Dispose()
- {
- _pendingPacketsQueue?.Dispose();
-
- _cancellationTokenSource?.Cancel ();
- _cancellationTokenSource?.Dispose();
- _cancellationTokenSource = null;
+ _logger.Error(exception, "Error while disconnecting channel adapter.");
+ }
}
private void ProcessReceivedPacket(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
@@ -366,7 +462,12 @@ namespace MQTTnet.Server
var retainedMessages = _retainedMessagesManager.GetSubscribedMessages(topicFilters);
foreach (var applicationMessage in retainedMessages)
{
- EnqueueApplicationMessage(null, applicationMessage);
+ var publishPacket = applicationMessage.ToPublishPacket();
+
+ // Set the retain flag to true according to [MQTT-3.3.1-8].
+ publishPacket.Retain = true;
+
+ EnqueueApplicationMessage(null, publishPacket);
}
}
diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
index 0b88636..0eb6c9f 100644
--- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs
+++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
@@ -23,25 +23,29 @@ namespace MQTTnet.Server
private readonly Dictionary _sessions = new Dictionary();
private readonly CancellationToken _cancellationToken;
+ private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;
- public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, IMqttNetChildLogger logger)
+ public MqttClientSessionsManager(
+ IMqttServerOptions options,
+ MqttRetainedMessagesManager retainedMessagesManager,
+ CancellationToken cancellationToken,
+ MqttServerEventDispatcher eventDispatcher,
+ IMqttNetChildLogger logger)
{
- if (logger == null) throw new ArgumentNullException(nameof(logger));
+ _cancellationToken = cancellationToken;
+ if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager));
- _cancellationToken = cancellationToken;
+ _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
_options = options ?? throw new ArgumentNullException(nameof(options));
- Server = server ?? throw new ArgumentNullException(nameof(server));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
}
- public MqttServer Server { get; }
-
public void Start()
{
Task.Factory.StartNew(() => TryProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
@@ -62,7 +66,7 @@ namespace MQTTnet.Server
public Task StartSession(IMqttChannelAdapter clientAdapter)
{
- return Task.Run(() => RunSession(clientAdapter, _cancellationToken), _cancellationToken);
+ return Task.Run(() => RunSessionAsync(clientAdapter, _cancellationToken), _cancellationToken);
}
public IList GetClientStatus()
@@ -177,7 +181,7 @@ namespace MQTTnet.Server
applicationMessage = interceptorContext.ApplicationMessage;
}
- Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);
+ _eventDispatcher.OnApplicationMessageReceived(sender?.ClientId, applicationMessage);
if (applicationMessage.Retain)
{
@@ -186,7 +190,12 @@ namespace MQTTnet.Server
foreach (var clientSession in GetSessions())
{
- clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage);
+ var publishPacket = applicationMessage.ToPublishPacket();
+
+ // Set the retain flag to true according to [MQTT-3.3.1-9].
+ publishPacket.Retain = false;
+
+ clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, publishPacket);
}
}
catch (OperationCanceledException)
@@ -206,7 +215,7 @@ namespace MQTTnet.Server
}
}
- private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
+ private async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
@@ -239,7 +248,6 @@ namespace MQTTnet.Server
}
var result = PrepareClientSession(connectPacket);
- var clientSession = result.Session;
await clientAdapter.SendPacketAsync(
new MqttConnAckPacket
@@ -249,9 +257,10 @@ namespace MQTTnet.Server
},
cancellationToken).ConfigureAwait(false);
- Server.OnClientConnected(clientId);
+ _logger.Info("Client '{0}': Connected.", clientId);
+ _eventDispatcher.OnClientConnected(clientId);
- await clientSession.RunAsync(connectPacket.WillMessage, connectPacket.KeepAlivePeriod, clientAdapter).ConfigureAwait(false);
+ await result.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -315,7 +324,7 @@ namespace MQTTnet.Server
{
isExistingSession = false;
- clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _logger);
+ clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _retainedMessagesManager, _eventDispatcher, _logger);
_sessions[connectPacket.ClientId] = clientSession;
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
index 83ac033..38fad1f 100644
--- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
+++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
@@ -10,14 +10,14 @@ namespace MQTTnet.Server
{
private readonly Dictionary _subscriptions = new Dictionary();
private readonly IMqttServerOptions _options;
- private readonly MqttServer _server;
+ private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly string _clientId;
- public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServer server)
+ public MqttClientSubscriptionsManager(string clientId, IMqttServerOptions options, MqttServerEventDispatcher eventDispatcher)
{
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_options = options ?? throw new ArgumentNullException(nameof(options));
- _server = server;
+ _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
}
public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket)
@@ -58,7 +58,7 @@ namespace MQTTnet.Server
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
}
- _server.OnClientSubscribedTopic(_clientId, topicFilter);
+ _eventDispatcher.OnClientSubscribedTopic(_clientId, topicFilter);
}
}
@@ -75,7 +75,7 @@ namespace MQTTnet.Server
{
_subscriptions.Remove(topicFilter);
- _server.OnClientUnsubscribedTopic(_clientId, topicFilter);
+ _eventDispatcher.OnClientUnsubscribedTopic(_clientId, topicFilter);
}
}
diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs
index e0f5a53..201216a 100644
--- a/Source/MQTTnet/Server/MqttServer.cs
+++ b/Source/MQTTnet/Server/MqttServer.cs
@@ -11,6 +11,7 @@ namespace MQTTnet.Server
{
public class MqttServer : IMqttServer
{
+ private readonly MqttServerEventDispatcher _eventDispatcher = new MqttServerEventDispatcher();
private readonly ICollection _adapters;
private readonly IMqttNetChildLogger _logger;
@@ -21,10 +22,16 @@ namespace MQTTnet.Server
public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger)
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
+ _adapters = adapters.ToList();
+
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttServer));
- _adapters = adapters.ToList();
+ _eventDispatcher.ClientConnected += (s, e) => ClientConnected?.Invoke(s, e);
+ _eventDispatcher.ClientDisconnected += (s, e) => ClientDisconnected?.Invoke(s, e);
+ _eventDispatcher.ClientSubscribedTopic += (s, e) => ClientSubscribedTopic?.Invoke(s, e);
+ _eventDispatcher.ClientUnsubscribedTopic += (s, e) => ClientUnsubscribedTopic?.Invoke(s, e);
+ _eventDispatcher.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
}
public event EventHandler Started;
@@ -92,7 +99,7 @@ namespace MQTTnet.Server
_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);
- _clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _cancellationTokenSource.Token, _logger);
+ _clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
_clientSessionsManager.Start();
foreach (var adapter in _adapters)
@@ -144,33 +151,6 @@ namespace MQTTnet.Server
return _retainedMessagesManager?.ClearMessagesAsync();
}
- internal void OnClientConnected(string clientId)
- {
- _logger.Info("Client '{0}': Connected.", clientId);
- ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId));
- }
-
- internal void OnClientDisconnected(string clientId, bool wasCleanDisconnect)
- {
- _logger.Info("Client '{0}': Disconnected (clean={1}).", clientId, wasCleanDisconnect);
- ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect));
- }
-
- internal void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
- {
- ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
- }
-
- internal void OnClientUnsubscribedTopic(string clientId, string topicFilter)
- {
- ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
- }
-
- internal void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
- {
- ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));
- }
-
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
eventArgs.SessionTask = _clientSessionsManager.StartSession(eventArgs.Client);
diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs
new file mode 100644
index 0000000..8fd5652
--- /dev/null
+++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs
@@ -0,0 +1,42 @@
+using System;
+
+namespace MQTTnet.Server
+{
+ public class MqttServerEventDispatcher
+ {
+ public event EventHandler ClientSubscribedTopic;
+
+ public event EventHandler ClientUnsubscribedTopic;
+
+ public event EventHandler ClientConnected;
+
+ public event EventHandler ClientDisconnected;
+
+ public event EventHandler ApplicationMessageReceived;
+
+ public void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
+ {
+ ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
+ }
+
+ public void OnClientUnsubscribedTopic(string clientId, string topicFilter)
+ {
+ ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
+ }
+
+ public void OnClientDisconnected(string clientId, bool wasCleanDisconnect)
+ {
+ ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, wasCleanDisconnect));
+ }
+
+ public void OnApplicationMessageReceived(string senderClientId, MqttApplicationMessage applicationMessage)
+ {
+ ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
+ }
+
+ public void OnClientConnected(string clientId)
+ {
+ ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientId));
+ }
+ }
+}
diff --git a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
index 26e2415..7b632b2 100644
--- a/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
+++ b/Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
@@ -3,8 +3,9 @@
Exe
Full
- net461
- 7.2
+ net461
+ netcoreapp2.1
+ 7.2
diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs
index 414f253..181a314 100644
--- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs
@@ -1,4 +1,6 @@
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -8,6 +10,7 @@ using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
+using MQTTnet.Server;
namespace MQTTnet.Core.Tests
{
@@ -15,7 +18,7 @@ namespace MQTTnet.Core.Tests
public class MqttClientTests
{
[TestMethod]
- public async Task ClientDisconnectException()
+ public async Task Client_Disconnect_Exception()
{
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
@@ -39,9 +42,95 @@ namespace MQTTnet.Core.Tests
Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException));
}
+ [TestMethod]
+ public async Task Client_Publish()
+ {
+ var server = new MqttFactory().CreateMqttServer();
+
+ try
+ {
+ var receivedMessages = new List();
+
+ await server.StartAsync(new MqttServerOptions());
+
+ var client1 = new MqttFactory().CreateMqttClient();
+ client1.ApplicationMessageReceived += (_, e) =>
+ {
+ lock (receivedMessages)
+ {
+ receivedMessages.Add(e.ApplicationMessage);
+ }
+ };
+
+ await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build());
+ await client1.SubscribeAsync("a");
+
+ var client2 = new MqttFactory().CreateMqttClient();
+ await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build());
+ var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build();
+ await client2.PublishAsync(message);
+
+ await Task.Delay(500);
+
+ Assert.AreEqual(1, receivedMessages.Count);
+ Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above!
+ }
+ finally
+ {
+ await server.StopAsync();
+ }
+ }
+
+ [TestMethod]
+ public async Task Publish_Special_Content()
+ {
+ var factory = new MqttFactory();
+ var server = factory.CreateMqttServer();
+ var serverOptions = new MqttServerOptionsBuilder().Build();
+
+ var receivedMessages = new List();
+
+ var client = factory.CreateMqttClient();
+
+ try
+ {
+ await server.StartAsync(serverOptions);
+
+ client.Connected += async (s, e) =>
+ {
+ await client.SubscribeAsync("RCU/P1/H0001/R0003");
+
+ var msg = new MqttApplicationMessageBuilder()
+ .WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|")
+ .WithTopic("RCU/P1/H0001/R0003");
+
+ await client.PublishAsync(msg.Build());
+ };
+
+ client.ApplicationMessageReceived += (s, e) =>
+ {
+ lock (receivedMessages)
+ {
+ receivedMessages.Add(e.ApplicationMessage);
+ }
+ };
+
+ await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
+
+ await Task.Delay(500);
+
+ Assert.AreEqual(1, receivedMessages.Count);
+ Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString());
+ }
+ finally
+ {
+ await server.StopAsync();
+ }
+ }
+
#if DEBUG
[TestMethod]
- public async Task ClientCleanupOnAuthentificationFails()
+ public async Task Client_Cleanup_On_Authentification_Fails()
{
var channel = new TestMqttCommunicationAdapter();
var channel2 = new TestMqttCommunicationAdapter();
@@ -50,7 +139,10 @@ namespace MQTTnet.Core.Tests
Task.Run(async () => {
var connect = await channel2.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None);
- await channel2.SendPacketAsync(new MqttConnAckPacket { ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized }, CancellationToken.None);
+ await channel2.SendPacketAsync(new MqttConnAckPacket
+ {
+ ConnectReturnCode = Protocol.MqttConnectReturnCode.ConnectionRefusedNotAuthorized
+ }, CancellationToken.None);
});
var fake = new TestMqttCommunicationAdapterFactory(channel);
diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
index c1f0ed8..ade6b1b 100644
--- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
@@ -4,6 +4,7 @@ using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -227,6 +228,39 @@ namespace MQTTnet.Core.Tests
Assert.AreEqual(2000, receivedMessagesCount);
}
+
+ [TestMethod]
+ public async Task MqttServer_SessionTakeover()
+ {
+ var server = new MqttFactory().CreateMqttServer();
+ try
+ {
+ await server.StartAsync(new MqttServerOptions());
+
+ var client1 = new MqttFactory().CreateMqttClient();
+ var client2 = new MqttFactory().CreateMqttClient();
+
+ var options = new MqttClientOptionsBuilder()
+ .WithTcpServer("localhost")
+ .WithCleanSession(false)
+ .WithClientId("a").Build();
+
+ await client1.ConnectAsync(options);
+
+ await Task.Delay(500);
+
+ await client2.ConnectAsync(options);
+
+ await Task.Delay(500);
+
+ Assert.IsFalse(client1.IsConnected);
+ Assert.IsTrue(client2.IsConnected);
+ }
+ finally
+ {
+ await server.StopAsync();
+ }
+ }
private static async Task Publish(IMqttClient c1, MqttApplicationMessage message)
{
@@ -267,11 +301,6 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public async Task MqttServer_HandleCleanDisconnect()
{
- MqttNetGlobalLogger.LogMessagePublished += (_, e) =>
- {
- System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}");
- };
-
var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
@@ -304,6 +333,51 @@ namespace MQTTnet.Core.Tests
Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled);
}
+ [TestMethod]
+ public async Task MqttServer_Client_Disconnect_Without_Errors()
+ {
+ var errors = 0;
+
+ MqttNetGlobalLogger.LogMessagePublished += (_, e) =>
+ {
+ System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}");
+
+ if (e.TraceMessage.Level == MqttNetLogLevel.Error)
+ {
+ errors++;
+ }
+ };
+
+ bool clientWasConnected;
+
+ var server = new MqttFactory().CreateMqttServer();
+ try
+ {
+ var options = new MqttServerOptionsBuilder().Build();
+ await server.StartAsync(options);
+
+ var client = new MqttFactory().CreateMqttClient();
+ var clientOptions = new MqttClientOptionsBuilder()
+ .WithTcpServer("localhost")
+ .Build();
+
+ await client.ConnectAsync(clientOptions);
+
+ clientWasConnected = true;
+
+ await client.DisconnectAsync();
+
+ await Task.Delay(500);
+ }
+ finally
+ {
+ await server.StopAsync();
+ }
+
+ Assert.IsTrue(clientWasConnected);
+ Assert.AreEqual(0, errors);
+ }
+
[TestMethod]
public async Task MqttServer_LotsOfRetainedMessages()
{
@@ -389,6 +463,8 @@ namespace MQTTnet.Core.Tests
}
await c2.DisconnectAsync();
+
+ await s.StopAsync();
}
[TestMethod]
@@ -427,7 +503,7 @@ namespace MQTTnet.Core.Tests
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
- var receivedMessagesCount = 0;
+ var receivedMessages = new List();
try
{
await s.StartAsync(new MqttServerOptions());
@@ -437,7 +513,14 @@ namespace MQTTnet.Core.Tests
await c1.DisconnectAsync();
var c2 = await serverAdapter.ConnectTestClient("c2");
- c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
+ c2.ApplicationMessageReceived += (_, e) =>
+ {
+ lock (receivedMessages)
+ {
+ receivedMessages.Add(e.ApplicationMessage);
+ }
+ };
+
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
await Task.Delay(500);
@@ -447,7 +530,8 @@ namespace MQTTnet.Core.Tests
await s.StopAsync();
}
- Assert.AreEqual(1, receivedMessagesCount);
+ Assert.AreEqual(1, receivedMessages.Count);
+ Assert.IsTrue(receivedMessages.First().Retain);
}
[TestMethod]
@@ -655,26 +739,29 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
{
- var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
- var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
+ var s = new MqttFactory().CreateMqttServer();
- var connectedClient = false;
- var connecteCalledBeforeConnectedClients = false;
+ var events = new List();
s.ClientConnected += (_, __) =>
{
- connecteCalledBeforeConnectedClients |= connectedClient;
- connectedClient = true;
+ lock (events)
+ {
+ events.Add("c");
+ }
};
s.ClientDisconnected += (_, __) =>
{
- connectedClient = false;
+ lock (events)
+ {
+ events.Add("d");
+ }
};
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
- .WithClientId(Guid.NewGuid().ToString())
+ .WithClientId("same_id")
.Build();
await s.StartAsync(new MqttServerOptions());
@@ -684,20 +771,24 @@ namespace MQTTnet.Core.Tests
await c1.ConnectAsync(clientOptions);
- await Task.Delay(100);
+ await Task.Delay(250);
await c2.ConnectAsync(clientOptions);
- await Task.Delay(100);
+ await Task.Delay(250);
await c1.DisconnectAsync();
+
+ await Task.Delay(250);
+
await c2.DisconnectAsync();
- await s.StopAsync();
+ await Task.Delay(250);
- await Task.Delay(100);
+ await s.StopAsync();
- Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called");
+ var flow = string.Join(string.Empty, events);
+ Assert.AreEqual("cdcd", flow);
}
@@ -725,6 +816,8 @@ namespace MQTTnet.Core.Tests
await server.StartAsync(new MqttServerOptions());
var client3 = new MqttFactory().CreateMqttClient();
await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
+
+ await server.StopAsync();
}
private class TestStorage : IMqttServerStorage
diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
index 268e5fe..2bdd15f 100644
--- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
@@ -1,6 +1,4 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using MQTTnet.Adapter;
-using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
@@ -13,7 +11,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{
- var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
+ var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());
var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -28,7 +26,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
{
- var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
+ var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());
var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
@@ -43,7 +41,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
{
- var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
+ var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());
var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
@@ -59,7 +57,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{
- var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
+ var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());
var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -72,7 +70,7 @@ namespace MQTTnet.Core.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{
- var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServer(new IMqttServerAdapter[0], new MqttNetLogger().CreateChildLogger("")));
+ var sm = new MqttClientSubscriptionsManager("", new MqttServerOptions(), new MqttServerEventDispatcher());
var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
diff --git a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
index af5f321..0dd56d7 100644
--- a/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
+++ b/Tests/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj
@@ -3,7 +3,8 @@
Exe
Full
- netcoreapp2.1;net452;net461
+ netcoreapp2.1
+ $(TargetFrameworks);net452;net461
diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
index 60ce27d..563f343 100644
--- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
+++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
@@ -61,13 +61,7 @@ namespace MQTTnet.TestApp.NetCore
public class RandomPassword : IMqttClientCredentials
{
- public string Password
- {
- get
- {
- return Guid.NewGuid().ToString(); // The random password.
- }
- }
+ public string Password => Guid.NewGuid().ToString();
public string Username => "the_static_user";
}
diff --git a/appveyor.yml b/appveyor.yml
index 253e07d..43a3dd9 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -3,7 +3,7 @@ image: Visual Studio 2017
configuration: Release
before_build:
- cmd: >-
- msbuild /t:restore
+ msbuild /t:restore MQTTnet.sln
cd Tests/MQTTnet.TestApp.AspNetCore2/