diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index fd8614e..a6e7faf 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -11,6 +11,7 @@
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.
+* [All] Due to a merge issue not all changes are included in 3.0.8. All these changes are now included in this version.
* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
diff --git a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
index c0768b4..e6f0582 100644
--- a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
+++ b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
@@ -14,11 +14,13 @@
RELEASE;NETSTANDARD2_0
-
+
+
+
-
+
diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
index d7585cb..ccf5a33 100644
--- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
+++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
@@ -1,3 +1,9 @@
+using MQTTnet.Channel;
+using MQTTnet.Diagnostics;
+using MQTTnet.Exceptions;
+using MQTTnet.Formatter;
+using MQTTnet.Internal;
+using MQTTnet.Packets;
using System;
using System.IO;
using System.Net.Sockets;
@@ -5,30 +11,24 @@ using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
-using MQTTnet.Channel;
-using MQTTnet.Diagnostics;
-using MQTTnet.Exceptions;
-using MQTTnet.Formatter;
-using MQTTnet.Internal;
-using MQTTnet.Packets;
namespace MQTTnet.Adapter
{
public class MqttChannelAdapter : Disposable, IMqttChannelAdapter
{
- private const uint ErrorOperationAborted = 0x800703E3;
- private const int ReadBufferSize = 4096; // TODO: Move buffer size to config
+ const uint ErrorOperationAborted = 0x800703E3;
+ const int ReadBufferSize = 4096; // TODO: Move buffer size to config
- private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);
+ readonly IMqttNetChildLogger _logger;
+ readonly IMqttChannel _channel;
+ readonly MqttPacketReader _packetReader;
- private readonly IMqttNetChildLogger _logger;
- private readonly IMqttChannel _channel;
- private readonly MqttPacketReader _packetReader;
+ readonly byte[] _fixedHeaderBuffer = new byte[2];
- private readonly byte[] _fixedHeaderBuffer = new byte[2];
-
- private long _bytesReceived;
- private long _bytesSent;
+ SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);
+
+ long _bytesReceived;
+ long _bytesSent;
public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger)
{
@@ -55,7 +55,7 @@ namespace MQTTnet.Adapter
public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; }
-
+
public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -143,7 +143,7 @@ namespace MQTTnet.Adapter
}
finally
{
- _writerSemaphore.Release();
+ _writerSemaphore?.Release();
}
}
@@ -207,7 +207,20 @@ namespace MQTTnet.Adapter
Interlocked.Exchange(ref _bytesSent, 0L);
}
- private async Task ReceiveAsync(CancellationToken cancellationToken)
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _channel?.Dispose();
+
+ _writerSemaphore?.Dispose();
+ _writerSemaphore = null;
+ }
+
+ base.Dispose(disposing);
+ }
+
+ async Task ReceiveAsync(CancellationToken cancellationToken)
{
var readFixedHeaderResult = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false);
@@ -267,25 +280,14 @@ namespace MQTTnet.Adapter
}
}
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- _channel?.Dispose();
- _writerSemaphore?.Dispose();
- }
-
- base.Dispose(disposing);
- }
-
- private static bool IsWrappedException(Exception exception)
+ static bool IsWrappedException(Exception exception)
{
return exception is OperationCanceledException ||
exception is MqttCommunicationTimedOutException ||
exception is MqttCommunicationException;
}
- private static void WrapException(Exception exception)
+ static void WrapException(Exception exception)
{
if (exception is IOException && exception.InnerException is SocketException innerException)
{
diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
index b172290..d8f0a04 100644
--- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
+++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
@@ -1,9 +1,9 @@
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using MQTTnet.Exceptions;
+using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
namespace MQTTnet.PacketDispatcher
{
@@ -12,7 +12,7 @@ namespace MQTTnet.PacketDispatcher
private readonly TaskCompletionSource _taskCompletionSource;
private readonly ushort? _packetIdentifier;
private readonly MqttPacketDispatcher _owningPacketDispatcher;
-
+
public MqttPacketAwaiter(ushort? packetIdentifier, MqttPacketDispatcher owningPacketDispatcher)
{
_packetIdentifier = packetIdentifier;
@@ -87,6 +87,7 @@ namespace MQTTnet.PacketDispatcher
{
_owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier);
}
+
base.Dispose(disposing);
}
}
diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs
index c9e2553..5528969 100644
--- a/Source/MQTTnet/Server/MqttClientConnection.cs
+++ b/Source/MQTTnet/Server/MqttClientConnection.cs
@@ -1,8 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using MQTTnet.Adapter;
+using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
@@ -12,6 +8,10 @@ using MQTTnet.PacketDispatcher;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
namespace MQTTnet.Server
{
@@ -437,9 +437,8 @@ namespace MQTTnet.Server
{
_logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", ClientId);
}
- else if (exception is OperationCanceledException && _cancellationToken.Token.IsCancellationRequested)
+ else if (exception is OperationCanceledException)
{
- // The cancellation was triggered externally.
}
else
{
diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
index edfe058..52ec583 100644
--- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs
@@ -1,9 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
@@ -12,6 +7,11 @@ using MQTTnet.Diagnostics;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
namespace MQTTnet.Tests
{
@@ -110,6 +110,8 @@ namespace MQTTnet.Tests
await managedClient.StopAsync();
+ await Task.Delay(500);
+
Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
}
}
diff --git a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
index eb7aba3..740980d 100644
--- a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs
@@ -15,35 +15,38 @@ namespace MQTTnet.Tests
{
var factory = new MqttFactory();
- //This test compares
- //1. correct logID
- string logId = "logId";
+ // This test compares
+ // 1. correct logID
+ var logId = "logId";
string invalidLogId = null;
- //2. if the total log calls are the same for global and local
- int globalLogCount = 0;
- int localLogCount = 0;
+ // 2. if the total log calls are the same for global and local
+ //var globalLogCount = 0;
+ var localLogCount = 0;
- MqttNetLogger logger = new MqttNetLogger(logId);
+ var logger = new MqttNetLogger(logId);
- //we have a theoretical bug here if a concurrent test is also logging
- var globalLog = new EventHandler((s, e) =>
- {
- if (logId != e.TraceMessage.LogId)
- {
- invalidLogId = e.TraceMessage.LogId;
- }
- Interlocked.Increment(ref globalLogCount);
- });
+ // TODO: This is commented out because it is affected by other tests.
+ //// we have a theoretical bug here if a concurrent test is also logging
+ //var globalLog = new EventHandler((s, e) =>
+ //{
+ // if (e.TraceMessage.LogId != logId)
+ // {
+ // invalidLogId = e.TraceMessage.LogId;
+ // }
+
+ // Interlocked.Increment(ref globalLogCount);
+ //});
- MqttNetGlobalLogger.LogMessagePublished += globalLog;
+ //MqttNetGlobalLogger.LogMessagePublished += globalLog;
logger.LogMessagePublished += (s, e) =>
{
- if (logId != e.TraceMessage.LogId)
+ if (e.TraceMessage.LogId != logId)
{
invalidLogId = e.TraceMessage.LogId;
}
+
Interlocked.Increment(ref localLogCount);
};
@@ -54,10 +57,10 @@ namespace MQTTnet.Tests
clientOptions.WithClientOptions(o => o.WithTcpServer("this_is_an_invalid_host").WithCommunicationTimeout(TimeSpan.FromSeconds(1)));
- //try connect to get some log entries
+ // try connect to get some log entries
await managedClient.StartAsync(clientOptions.Build());
- //wait at least connect timeout or we have some log messages
+ // wait at least connect timeout or we have some log messages
var tcs = new TaskCompletionSource