From 561474c5723383866b6e1388d38826b9c292d761 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 26 Mar 2020 15:35:14 +0100 Subject: [PATCH] Fix broken unit tests. --- Build/MQTTnet.nuspec | 1 + .../MQTTnet.AspNetCore.csproj | 6 +- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 66 ++++++++++--------- .../PacketDispatcher/MqttPacketAwaiter.cs | 11 ++-- Source/MQTTnet/Server/MqttClientConnection.cs | 13 ++-- .../ManagedMqttClient_Tests.cs | 14 ++-- Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs | 48 +++++++------- 7 files changed, 84 insertions(+), 75 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index fd8614e..a6e7faf 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,6 +11,7 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. +* [All] Due to a merge issue not all changes are included in 3.0.8. All these changes are now included in this version. * [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe). * [ManagedClient] Added support for persisted sessions (thansk to @PMExtra). * [Client] Improve connection stability (thanks to @jltjohanlindqvist). diff --git a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj index c0768b4..e6f0582 100644 --- a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj +++ b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj @@ -14,11 +14,13 @@ RELEASE;NETSTANDARD2_0 - + + + - + diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index d7585cb..ccf5a33 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -1,3 +1,9 @@ +using MQTTnet.Channel; +using MQTTnet.Diagnostics; +using MQTTnet.Exceptions; +using MQTTnet.Formatter; +using MQTTnet.Internal; +using MQTTnet.Packets; using System; using System.IO; using System.Net.Sockets; @@ -5,30 +11,24 @@ using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Channel; -using MQTTnet.Diagnostics; -using MQTTnet.Exceptions; -using MQTTnet.Formatter; -using MQTTnet.Internal; -using MQTTnet.Packets; namespace MQTTnet.Adapter { public class MqttChannelAdapter : Disposable, IMqttChannelAdapter { - private const uint ErrorOperationAborted = 0x800703E3; - private const int ReadBufferSize = 4096; // TODO: Move buffer size to config + const uint ErrorOperationAborted = 0x800703E3; + const int ReadBufferSize = 4096; // TODO: Move buffer size to config - private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + readonly IMqttNetChildLogger _logger; + readonly IMqttChannel _channel; + readonly MqttPacketReader _packetReader; - private readonly IMqttNetChildLogger _logger; - private readonly IMqttChannel _channel; - private readonly MqttPacketReader _packetReader; + readonly byte[] _fixedHeaderBuffer = new byte[2]; - private readonly byte[] _fixedHeaderBuffer = new byte[2]; - - private long _bytesReceived; - private long _bytesSent; + SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + + long _bytesReceived; + long _bytesSent; public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger) { @@ -55,7 +55,7 @@ namespace MQTTnet.Adapter public Action ReadingPacketStartedCallback { get; set; } public Action ReadingPacketCompletedCallback { get; set; } - + public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -143,7 +143,7 @@ namespace MQTTnet.Adapter } finally { - _writerSemaphore.Release(); + _writerSemaphore?.Release(); } } @@ -207,7 +207,20 @@ namespace MQTTnet.Adapter Interlocked.Exchange(ref _bytesSent, 0L); } - private async Task ReceiveAsync(CancellationToken cancellationToken) + protected override void Dispose(bool disposing) + { + if (disposing) + { + _channel?.Dispose(); + + _writerSemaphore?.Dispose(); + _writerSemaphore = null; + } + + base.Dispose(disposing); + } + + async Task ReceiveAsync(CancellationToken cancellationToken) { var readFixedHeaderResult = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false); @@ -267,25 +280,14 @@ namespace MQTTnet.Adapter } } - protected override void Dispose(bool disposing) - { - if (disposing) - { - _channel?.Dispose(); - _writerSemaphore?.Dispose(); - } - - base.Dispose(disposing); - } - - private static bool IsWrappedException(Exception exception) + static bool IsWrappedException(Exception exception) { return exception is OperationCanceledException || exception is MqttCommunicationTimedOutException || exception is MqttCommunicationException; } - private static void WrapException(Exception exception) + static void WrapException(Exception exception) { if (exception is IOException && exception.InnerException is SocketException innerException) { diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs index b172290..d8f0a04 100644 --- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs +++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs @@ -1,9 +1,9 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Exceptions; +using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.PacketDispatcher { @@ -12,7 +12,7 @@ namespace MQTTnet.PacketDispatcher private readonly TaskCompletionSource _taskCompletionSource; private readonly ushort? _packetIdentifier; private readonly MqttPacketDispatcher _owningPacketDispatcher; - + public MqttPacketAwaiter(ushort? packetIdentifier, MqttPacketDispatcher owningPacketDispatcher) { _packetIdentifier = packetIdentifier; @@ -87,6 +87,7 @@ namespace MQTTnet.PacketDispatcher { _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier); } + base.Dispose(disposing); } } diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index c9e2553..5528969 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; @@ -12,6 +8,10 @@ using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Server { @@ -437,9 +437,8 @@ namespace MQTTnet.Server { _logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", ClientId); } - else if (exception is OperationCanceledException && _cancellationToken.Token.IsCancellationRequested) + else if (exception is OperationCanceledException) { - // The cancellation was triggered externally. } else { diff --git a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs index edfe058..52ec583 100644 --- a/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/ManagedMqttClient_Tests.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Options; @@ -12,6 +7,11 @@ using MQTTnet.Diagnostics; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Server; using MQTTnet.Tests.Mockups; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests { @@ -110,6 +110,8 @@ namespace MQTTnet.Tests await managedClient.StopAsync(); + await Task.Delay(500); + Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs index eb7aba3..740980d 100644 --- a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs @@ -15,35 +15,38 @@ namespace MQTTnet.Tests { var factory = new MqttFactory(); - //This test compares - //1. correct logID - string logId = "logId"; + // This test compares + // 1. correct logID + var logId = "logId"; string invalidLogId = null; - //2. if the total log calls are the same for global and local - int globalLogCount = 0; - int localLogCount = 0; + // 2. if the total log calls are the same for global and local + //var globalLogCount = 0; + var localLogCount = 0; - MqttNetLogger logger = new MqttNetLogger(logId); + var logger = new MqttNetLogger(logId); - //we have a theoretical bug here if a concurrent test is also logging - var globalLog = new EventHandler((s, e) => - { - if (logId != e.TraceMessage.LogId) - { - invalidLogId = e.TraceMessage.LogId; - } - Interlocked.Increment(ref globalLogCount); - }); + // TODO: This is commented out because it is affected by other tests. + //// we have a theoretical bug here if a concurrent test is also logging + //var globalLog = new EventHandler((s, e) => + //{ + // if (e.TraceMessage.LogId != logId) + // { + // invalidLogId = e.TraceMessage.LogId; + // } + + // Interlocked.Increment(ref globalLogCount); + //}); - MqttNetGlobalLogger.LogMessagePublished += globalLog; + //MqttNetGlobalLogger.LogMessagePublished += globalLog; logger.LogMessagePublished += (s, e) => { - if (logId != e.TraceMessage.LogId) + if (e.TraceMessage.LogId != logId) { invalidLogId = e.TraceMessage.LogId; } + Interlocked.Increment(ref localLogCount); }; @@ -54,10 +57,10 @@ namespace MQTTnet.Tests clientOptions.WithClientOptions(o => o.WithTcpServer("this_is_an_invalid_host").WithCommunicationTimeout(TimeSpan.FromSeconds(1))); - //try connect to get some log entries + // try connect to get some log entries await managedClient.StartAsync(clientOptions.Build()); - //wait at least connect timeout or we have some log messages + // wait at least connect timeout or we have some log messages var tcs = new TaskCompletionSource(); managedClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(e => tcs.TrySetResult(null)); await Task.WhenAny(Task.Delay(managedClient.Options.ClientOptions.CommunicationTimeout), tcs.Task); @@ -66,12 +69,11 @@ namespace MQTTnet.Tests { await managedClient.StopAsync(); - MqttNetGlobalLogger.LogMessagePublished -= globalLog; + //MqttNetGlobalLogger.LogMessagePublished -= globalLog; } Assert.IsNull(invalidLogId); - Assert.AreNotEqual(0, globalLogCount); - Assert.AreEqual(globalLogCount, localLogCount); + Assert.AreNotEqual(0, localLogCount); } } } \ No newline at end of file