diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index fd8614e..318fadd 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -21,6 +21,7 @@ * [Server] Added interceptor for unsubscriptions. * [MQTTnet.Server] Added interceptor for unsubscriptions. * [MQTTnet.AspNetCore] improved compatibility with AspNetCore 3.1 +* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol. Copyright Christian Kratky 2016-2019 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs index e8f570c..44acb77 100644 --- a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs +++ b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs @@ -1,17 +1,16 @@ -using System; -using System.Net; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.AspNetCore.Client.Tcp; -using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; +using System; +using System.Net; namespace MQTTnet.AspNetCore.Client { public class MqttClientConnectionContextFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index a052dae..8e0134c 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -1,20 +1,20 @@ -using System; -using System.Net.WebSockets; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; using MQTTnet.Server; +using System; +using System.Net.WebSockets; +using System.Threading.Tasks; namespace MQTTnet.AspNetCore { public class MqttWebSocketServerAdapter : IMqttServerAdapter { - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; - public MqttWebSocketServerAdapter(IMqttNetChildLogger logger) + public MqttWebSocketServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -38,7 +38,7 @@ namespace MQTTnet.AspNetCore if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); var endpoint = $"{httpContext.Connection.RemoteIpAddress}:{httpContext.Connection.RemotePort}"; - + var clientCertificate = await httpContext.Connection.GetClientCertificateAsync().ConfigureAwait(false); try { diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 8b5a48f..10d7d77 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Client; +using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Publishing; @@ -13,6 +8,11 @@ using MQTTnet.Exceptions; using MQTTnet.Internal; using MQTTnet.Protocol; using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Extensions.ManagedClient { @@ -33,7 +33,7 @@ namespace MQTTnet.Extensions.ManagedClient private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0); private readonly IMqttClient _mqttClient; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private readonly AsyncLock _messageQueueLock = new AsyncLock(); @@ -42,8 +42,8 @@ namespace MQTTnet.Extensions.ManagedClient private Task _maintainConnectionTask; private ManagedMqttClientStorageManager _storageManager; - - public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) + + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs index 7625fc9..1c3a9c6 100644 --- a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs +++ b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs @@ -1,15 +1,15 @@ -using System; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; +using System; namespace MQTTnet.Extensions.WebSocket4Net { public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -17,19 +17,19 @@ namespace MQTTnet.Extensions.WebSocket4Net switch (options.ChannelOptions) { case MqttClientTcpOptions _: - { - return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); - } + { + return new MqttChannelAdapter(new MqttTcpChannel(options), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } case MqttClientWebSocketOptions webSocketOptions: - { - return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); - } + { + return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options, webSocketOptions), new MqttPacketFormatterAdapter(options.ProtocolVersion), logger); + } default: - { - throw new NotSupportedException(); - } + { + throw new NotSupportedException(); + } } } } diff --git a/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs index 7f35117..656239f 100644 --- a/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs +++ b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs @@ -1,43 +1,43 @@ -using System; -using MQTTnet.Diagnostics; - -namespace MQTTnet.Server.Logging -{ - public class MqttNetChildLoggerWrapper : IMqttNetChildLogger - { - private readonly MqttNetLoggerWrapper _logger; - private readonly string _source; - - public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - - _source = source; - } - - public IMqttNetChildLogger CreateChildLogger(string source = null) - { - return _logger.CreateChildLogger(source); - } - - public void Verbose(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); - } - - public void Info(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); - } - - public void Warning(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); - } - - public void Error(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); - } - } -} +//using MQTTnet.Diagnostics; +//using System; + +//namespace MQTTnet.Server.Logging +//{ +// public class MqttNetChildLoggerWrapper : IMqttNetChildLogger +// { +// private readonly MqttNetLoggerWrapper _logger; +// private readonly string _source; + +// public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger) +// { +// _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + +// _source = source; +// } + +// public IMqttNetLogger CreateChildLogger(string source = null) +// { +// return _logger.CreateChildLogger(source); +// } + +// public void Verbose(string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); +// } + +// public void Info(string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); +// } + +// public void Warning(Exception exception, string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); +// } + +// public void Error(Exception exception, string message, params object[] parameters) +// { +// _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); +// } +// } +//} diff --git a/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs index 3f52109..668ae26 100644 --- a/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs +++ b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs @@ -1,7 +1,7 @@ -using System; -using System.Threading; -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging; using MQTTnet.Diagnostics; +using System; +using System.Threading; namespace MQTTnet.Server.Logging { @@ -16,9 +16,9 @@ namespace MQTTnet.Server.Logging public event EventHandler LogMessagePublished; - public IMqttNetChildLogger CreateChildLogger(string source = null) + public IMqttNetLogger CreateChildLogger(string source = null) { - return new MqttNetChildLoggerWrapper(source, this); + return new MqttNetLogger(source); } public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) @@ -33,8 +33,13 @@ namespace MQTTnet.Server.Logging logMessagePublishedEvent.Invoke(this, new MqttNetLogMessagePublishedEventArgs(logMessage)); } } - - private static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel) + + public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) + { + Publish(logLevel, null, message, parameters, exception); + } + + static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel) { switch (logLevel) { diff --git a/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs b/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs index cb0988d..c9aba6f 100644 --- a/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs +++ b/Source/MQTTnet/Adapter/IMqttClientAdapterFactory.cs @@ -5,6 +5,6 @@ namespace MQTTnet.Adapter { public interface IMqttClientAdapterFactory { - IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger); + IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger); } } diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index d7585cb..5dd0281 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -1,3 +1,9 @@ +using MQTTnet.Channel; +using MQTTnet.Diagnostics; +using MQTTnet.Exceptions; +using MQTTnet.Formatter; +using MQTTnet.Internal; +using MQTTnet.Packets; using System; using System.IO; using System.Net.Sockets; @@ -5,32 +11,26 @@ using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Channel; -using MQTTnet.Diagnostics; -using MQTTnet.Exceptions; -using MQTTnet.Formatter; -using MQTTnet.Internal; -using MQTTnet.Packets; namespace MQTTnet.Adapter { - public class MqttChannelAdapter : Disposable, IMqttChannelAdapter + public sealed class MqttChannelAdapter : Disposable, IMqttChannelAdapter { - private const uint ErrorOperationAborted = 0x800703E3; - private const int ReadBufferSize = 4096; // TODO: Move buffer size to config + const uint ErrorOperationAborted = 0x800703E3; + const int ReadBufferSize = 4096; // TODO: Move buffer size to config - private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); - private readonly IMqttNetChildLogger _logger; - private readonly IMqttChannel _channel; - private readonly MqttPacketReader _packetReader; + readonly IMqttNetLogger _logger; + readonly IMqttChannel _channel; + readonly MqttPacketReader _packetReader; - private readonly byte[] _fixedHeaderBuffer = new byte[2]; - - private long _bytesReceived; - private long _bytesSent; + readonly byte[] _fixedHeaderBuffer = new byte[2]; - public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger) + long _bytesReceived; + long _bytesSent; + + public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -55,7 +55,7 @@ namespace MQTTnet.Adapter public Action ReadingPacketStartedCallback { get; set; } public Action ReadingPacketCompletedCallback { get; set; } - + public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -207,7 +207,18 @@ namespace MQTTnet.Adapter Interlocked.Exchange(ref _bytesSent, 0L); } - private async Task ReceiveAsync(CancellationToken cancellationToken) + protected override void Dispose(bool disposing) + { + if (disposing) + { + _channel?.Dispose(); + _writerSemaphore?.Dispose(); + } + + base.Dispose(disposing); + } + + async Task ReceiveAsync(CancellationToken cancellationToken) { var readFixedHeaderResult = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false); @@ -267,25 +278,14 @@ namespace MQTTnet.Adapter } } - protected override void Dispose(bool disposing) - { - if (disposing) - { - _channel?.Dispose(); - _writerSemaphore?.Dispose(); - } - - base.Dispose(disposing); - } - - private static bool IsWrappedException(Exception exception) + static bool IsWrappedException(Exception exception) { return exception is OperationCanceledException || exception is MqttCommunicationTimedOutException || exception is MqttCommunicationException; } - private static void WrapException(Exception exception) + static void WrapException(Exception exception) { if (exception is IOException && exception.InnerException is SocketException innerException) { diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 29687fc..2546fc1 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -1,7 +1,3 @@ -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; @@ -17,6 +13,10 @@ using MQTTnet.Internal; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Client { @@ -29,7 +29,7 @@ namespace MQTTnet.Client private readonly object _disconnectLock = new object(); private readonly IMqttClientAdapterFactory _adapterFactory; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private CancellationTokenSource _backgroundCancellationTokenSource; private Task _packetReceiverTask; diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs index 4fd0ccf..b301819 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs @@ -1,19 +1,19 @@ -using System; +using MQTTnet.Client.ExtendedAuthenticationExchange; +using MQTTnet.Formatter; +using System; using System.Linq; using System.Text; -using MQTTnet.Client.ExtendedAuthenticationExchange; -using MQTTnet.Formatter; namespace MQTTnet.Client.Options { public class MqttClientOptionsBuilder { - private readonly MqttClientOptions _options = new MqttClientOptions(); + readonly MqttClientOptions _options = new MqttClientOptions(); - private MqttClientTcpOptions _tcpOptions; - private MqttClientWebSocketOptions _webSocketOptions; - private MqttClientOptionsBuilderTlsParameters _tlsParameters; - private MqttClientWebSocketProxyOptions _proxyOptions; + MqttClientTcpOptions _tcpOptions; + MqttClientWebSocketOptions _webSocketOptions; + MqttClientOptionsBuilderTlsParameters _tlsParameters; + MqttClientWebSocketProxyOptions _proxyOptions; public MqttClientOptionsBuilder WithProtocolVersion(MqttProtocolVersion value) { diff --git a/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs b/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs deleted file mode 100644 index df99f44..0000000 --- a/Source/MQTTnet/Diagnostics/IMqttNetChildLogger.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; - -namespace MQTTnet.Diagnostics -{ - public interface IMqttNetChildLogger - { - IMqttNetChildLogger CreateChildLogger(string source = null); - - void Verbose(string message, params object[] parameters); - - void Info(string message, params object[] parameters); - - void Warning(Exception exception, string message, params object[] parameters); - - void Error(Exception exception, string message, params object[] parameters); - } -} diff --git a/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs b/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs index 73b6fcb..40bb04b 100644 --- a/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs +++ b/Source/MQTTnet/Diagnostics/IMqttNetLogger.cs @@ -6,8 +6,8 @@ namespace MQTTnet.Diagnostics { event EventHandler LogMessagePublished; - IMqttNetChildLogger CreateChildLogger(string source = null); + IMqttNetLogger CreateChildLogger(string source = null); - void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception); + void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception); } } diff --git a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs b/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs deleted file mode 100644 index 3733454..0000000 --- a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; - -namespace MQTTnet.Diagnostics -{ - public class MqttNetChildLogger : IMqttNetChildLogger - { - private readonly IMqttNetLogger _logger; - private readonly string _source; - - public MqttNetChildLogger(IMqttNetLogger logger, string source) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _source = source; - } - - public IMqttNetChildLogger CreateChildLogger(string source) - { - string childSource; - if (!string.IsNullOrEmpty(_source)) - { - childSource = _source + "." + source; - } - else - { - childSource = source; - } - - return new MqttNetChildLogger(_logger, childSource); - } - - public void Verbose(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); - } - - public void Info(string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); - } - - public void Warning(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); - } - - public void Error(Exception exception, string message, params object[] parameters) - { - _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); - } - } -} diff --git a/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs b/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs index 8a0c0fc..1b3488e 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetLogMessagePublishedEventArgs.cs @@ -7,8 +7,12 @@ namespace MQTTnet.Diagnostics public MqttNetLogMessagePublishedEventArgs(MqttNetLogMessage logMessage) { TraceMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage)); + LogMessage = logMessage ?? throw new ArgumentNullException(nameof(logMessage)); } + [Obsolete("Use new proeprty LogMessage instead.")] public MqttNetLogMessage TraceMessage { get; } + + public MqttNetLogMessage LogMessage { get; } } } diff --git a/Source/MQTTnet/Diagnostics/MqttNetLogger.cs b/Source/MQTTnet/Diagnostics/MqttNetLogger.cs index 56a664e..b14bf34 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetLogger.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetLogger.cs @@ -4,21 +4,27 @@ namespace MQTTnet.Diagnostics { public class MqttNetLogger : IMqttNetLogger { - private readonly string _logId; + readonly string _logId; + readonly string _source; - public MqttNetLogger(string logId = null) + public MqttNetLogger(string source, string logId = null) { + _source = source; _logId = logId; } + public MqttNetLogger() + { + } + public event EventHandler LogMessagePublished; - public IMqttNetChildLogger CreateChildLogger(string source = null) + public IMqttNetLogger CreateChildLogger(string source = null) { - return new MqttNetChildLogger(this, source); + return new MqttNetLogger(source, _logId); } - public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) + public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) { var hasLocalListeners = LogMessagePublished != null; var hasGlobalListeners = MqttNetGlobalLogger.HasListeners; @@ -40,7 +46,7 @@ namespace MQTTnet.Diagnostics } } - var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, source, logLevel, message, exception); + var traceMessage = new MqttNetLogMessage(_logId, DateTime.UtcNow, Environment.CurrentManagedThreadId, _source, logLevel, message, exception); if (hasGlobalListeners) { diff --git a/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs b/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs new file mode 100644 index 0000000..bc8a97b --- /dev/null +++ b/Source/MQTTnet/Diagnostics/MqttNetLoggerExtensions.cs @@ -0,0 +1,35 @@ +using System; + +namespace MQTTnet.Diagnostics +{ + public static class MqttNetLoggerExtensions + { + public static void Verbose(this IMqttNetLogger logger, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null); + } + + public static void Info(this IMqttNetLogger logger, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Info, message, parameters, null); + } + + public static void Warning(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception); + } + + public static void Error(this IMqttNetLogger logger, Exception exception, string message, params object[] parameters) + { + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + logger.Publish(MqttNetLogLevel.Error, message, parameters, exception); + } + } +} diff --git a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs b/Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs similarity index 89% rename from Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs rename to Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs index efbf08b..7135af6 100644 --- a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs +++ b/Source/MQTTnet/Diagnostics/TargetFrameworkProvider.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Diagnostics { - public static class TargetFrameworkInfoProvider + public static class TargetFrameworkProvider { public static string TargetFramework { diff --git a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs index 2377cde..148fe04 100644 --- a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs +++ b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs @@ -1,17 +1,17 @@ -using System; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Formatter; +using System; namespace MQTTnet.Implementations { public class MqttClientAdapterFactory : IMqttClientAdapterFactory { - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { if (options == null) throw new ArgumentNullException(nameof(options)); - + switch (options.ChannelOptions) { case MqttClientTcpOptions _: diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs index 3b24bd1..4d26da7 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -11,14 +11,14 @@ using System.Threading.Tasks; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : IMqttServerAdapter + public sealed class MqttTcpServerAdapter : IMqttServerAdapter { - private readonly IMqttNetChildLogger _logger; + readonly IMqttNetLogger _logger; - private IMqttServerOptions _options; - private StreamSocketListener _listener; + IMqttServerOptions _options; + StreamSocketListener _listener; - public MqttTcpServerAdapter(IMqttNetChildLogger logger) + public MqttTcpServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -68,7 +68,7 @@ namespace MQTTnet.Implementations _listener = null; } - private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) + async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) { try { diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 501c4da..22a5824 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -1,4 +1,8 @@ #if !WINDOWS_UWP +using MQTTnet.Adapter; +using MQTTnet.Diagnostics; +using MQTTnet.Internal; +using MQTTnet.Server; using System; using System.Collections.Generic; using System.Net; @@ -6,21 +10,17 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Adapter; -using MQTTnet.Diagnostics; -using MQTTnet.Internal; -using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter + public sealed class MqttTcpServerAdapter : Disposable, IMqttServerAdapter { - private readonly List _listeners = new List(); - private readonly IMqttNetChildLogger _logger; + readonly List _listeners = new List(); + readonly IMqttNetLogger _logger; - private CancellationTokenSource _cancellationTokenSource; + CancellationTokenSource _cancellationTokenSource; - public MqttTcpServerAdapter(IMqttNetChildLogger logger) + public MqttTcpServerAdapter(IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -59,7 +59,7 @@ namespace MQTTnet.Implementations { tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate, options.TlsEndpointOptions.CertificateCredentials.Password); } - + if (!tlsCertificate.HasPrivateKey) { throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); @@ -77,7 +77,17 @@ namespace MQTTnet.Implementations return Task.FromResult(0); } - private void Cleanup() + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + + base.Dispose(disposing); + } + + void Cleanup() { _cancellationTokenSource?.Cancel(false); _cancellationTokenSource?.Dispose(); @@ -91,16 +101,7 @@ namespace MQTTnet.Implementations _listeners.Clear(); } - protected override void Dispose(bool disposing) - { - if (disposing) - { - Cleanup(); - } - base.Dispose(disposing); - } - - private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) + void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) { if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) { diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index f2f439e..84cd7b9 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -1,4 +1,9 @@ #if !WINDOWS_UWP +using MQTTnet.Adapter; +using MQTTnet.Diagnostics; +using MQTTnet.Formatter; +using MQTTnet.Internal; +using MQTTnet.Server; using System; using System.IO; using System.Net; @@ -7,21 +12,16 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Adapter; -using MQTTnet.Diagnostics; -using MQTTnet.Formatter; -using MQTTnet.Internal; -using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerListener : IDisposable + public sealed class MqttTcpServerListener : IDisposable { - private readonly IMqttNetChildLogger _logger; - private readonly AddressFamily _addressFamily; - private readonly MqttServerTcpEndpointBaseOptions _options; - private readonly MqttServerTlsTcpEndpointOptions _tlsOptions; - private readonly X509Certificate2 _tlsCertificate; + readonly IMqttNetLogger _logger; + readonly AddressFamily _addressFamily; + readonly MqttServerTcpEndpointBaseOptions _options; + readonly MqttServerTlsTcpEndpointOptions _tlsOptions; + readonly X509Certificate2 _tlsCertificate; private Socket _socket; private IPEndPoint _localEndPoint; @@ -30,7 +30,7 @@ namespace MQTTnet.Implementations AddressFamily addressFamily, MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { _addressFamily = addressFamily; _options = options; @@ -67,12 +67,12 @@ namespace MQTTnet.Implementations { _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); } - + if (_options.NoDelay) { _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); } - + _socket.Bind(_localEndPoint); _socket.Listen(_options.ConnectionBacklog); @@ -87,7 +87,7 @@ namespace MQTTnet.Implementations throw; } - _logger.Warning(exception,"Error while creating listener socket for local end point '{0}'.", _localEndPoint); + _logger.Warning(exception, "Error while creating listener socket for local end point '{0}'.", _localEndPoint); return false; } } @@ -101,7 +101,7 @@ namespace MQTTnet.Implementations #endif } - private async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken) + async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { @@ -116,7 +116,7 @@ namespace MQTTnet.Implementations Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken).Forget(_logger); } catch (OperationCanceledException) - { + { } catch (Exception exception) { @@ -128,14 +128,14 @@ namespace MQTTnet.Implementations continue; } } - + _logger.Error(exception, $"Error while accepting connection at TCP listener {_localEndPoint} TLS={_tlsCertificate != null}."); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } - private async Task TryHandleClientConnectionAsync(Socket clientSocket) + async Task TryHandleClientConnectionAsync(Socket clientSocket) { Stream stream = null; string remoteEndPoint = null; @@ -160,9 +160,9 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(stream, false, _tlsOptions.RemoteCertificateValidationCallback); await sslStream.AuthenticateAsServerAsync( - _tlsCertificate, - _tlsOptions.ClientCertificateRequired, - _tlsOptions.SslProtocol, + _tlsCertificate, + _tlsOptions.ClientCertificateRequired, + _tlsOptions.SslProtocol, _tlsOptions.CheckCertificateRevocation).ConfigureAwait(false); stream = sslStream; diff --git a/Source/MQTTnet/Internal/TaskExtensions.cs b/Source/MQTTnet/Internal/TaskExtensions.cs index 733631c..12a8c07 100644 --- a/Source/MQTTnet/Internal/TaskExtensions.cs +++ b/Source/MQTTnet/Internal/TaskExtensions.cs @@ -1,11 +1,11 @@ -using System.Threading.Tasks; -using MQTTnet.Diagnostics; +using MQTTnet.Diagnostics; +using System.Threading.Tasks; namespace MQTTnet.Internal { public static class TaskExtensions { - public static void Forget(this Task task, IMqttNetChildLogger logger) + public static void Forget(this Task task, IMqttNetLogger logger) { task?.ContinueWith(t => { diff --git a/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs b/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs new file mode 100644 index 0000000..1734bf4 --- /dev/null +++ b/Source/MQTTnet/LowLevelClient/ILowLevelMqttClient.cs @@ -0,0 +1,19 @@ +using MQTTnet.Client.Options; +using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.LowLevelClient +{ + public interface ILowLevelMqttClient : IDisposable + { + Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken); + + Task DisconnectAsync(CancellationToken cancellationToken); + + Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken); + + Task ReceiveAsync(CancellationToken cancellationToken); + } +} diff --git a/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs b/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs new file mode 100644 index 0000000..045100d --- /dev/null +++ b/Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs @@ -0,0 +1,128 @@ +using MQTTnet.Adapter; +using MQTTnet.Client.Options; +using MQTTnet.Diagnostics; +using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.LowLevelClient +{ + public sealed class LowLevelMqttClient : ILowLevelMqttClient + { + readonly IMqttNetLogger _logger; + readonly IMqttClientAdapterFactory _clientAdapterFactory; + + IMqttChannelAdapter _adapter; + IMqttClientOptions _options; + + public LowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory, IMqttNetLogger logger) + { + if (clientAdapterFactory is null) throw new ArgumentNullException(nameof(clientAdapterFactory)); + if (logger is null) throw new ArgumentNullException(nameof(logger)); + + _clientAdapterFactory = clientAdapterFactory; + _logger = logger.CreateChildLogger(nameof(LowLevelMqttClient)); + } + + bool IsConnected => _adapter != null; + + public async Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken) + { + if (options is null) throw new ArgumentNullException(nameof(options)); + + if (_adapter != null) + { + throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again."); + } + + var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger); + + try + { + _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout})."); + await newAdapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + _logger.Verbose("Connection with server established."); + + _options = options; + } + catch (Exception) + { + _adapter.Dispose(); + throw; + } + + _adapter = newAdapter; + } + + public async Task DisconnectAsync(CancellationToken cancellationToken) + { + if (_adapter == null) + { + return; + } + + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + _adapter = null; + } + + public async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { + if (packet is null) throw new ArgumentNullException(nameof(packet)); + + if (_adapter == null) + { + throw new InvalidOperationException("Low level MQTT client is not connected."); + } + + try + { + await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public async Task ReceiveAsync(CancellationToken cancellationToken) + { + if (_adapter == null) + { + throw new InvalidOperationException("Low level MQTT client is not connected."); + } + + try + { + return await _adapter.ReceivePacketAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + await SafeDisconnect(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public void Dispose() + { + _adapter?.Dispose(); + } + + async Task SafeDisconnect(CancellationToken cancellationToken) + { + try + { + await _adapter.DisconnectAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while disconnecting."); + } + finally + { + _adapter.Dispose(); + } + } + } +} diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index cb835f6..da85cba 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -1,18 +1,19 @@ -using System; -using System.Collections.Generic; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Implementations; +using MQTTnet.LowLevelClient; using MQTTnet.Server; +using System; +using System.Collections.Generic; namespace MQTTnet { - public class MqttFactory : IMqttFactory + public sealed class MqttFactory : IMqttFactory { - private IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory(); + IMqttClientAdapterFactory _clientAdapterFactory = new MqttClientAdapterFactory(); - public MqttFactory() : this(new MqttNetLogger()) + public MqttFactory() : this(new MqttNetLogger(null, null)) { } @@ -29,6 +30,33 @@ namespace MQTTnet return this; } + public ILowLevelMqttClient CreateLowLevelMqttClient() + { + return CreateLowLevelMqttClient(DefaultLogger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + return new LowLevelMqttClient(_clientAdapterFactory, logger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory) + { + if (clientAdapterFactory == null) throw new ArgumentNullException(nameof(clientAdapterFactory)); + + return new LowLevelMqttClient(_clientAdapterFactory, DefaultLogger); + } + + public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactoryy) + { + if (logger == null) throw new ArgumentNullException(nameof(logger)); + if (clientAdapterFactoryy == null) throw new ArgumentNullException(nameof(clientAdapterFactoryy)); + + return new LowLevelMqttClient(_clientAdapterFactory, logger); + } + public IMqttClient CreateMqttClient() { return CreateMqttClient(DefaultLogger); @@ -79,7 +107,7 @@ namespace MQTTnet public IMqttServer CreateMqttServer(IEnumerable serverAdapters) { if (serverAdapters == null) throw new ArgumentNullException(nameof(serverAdapters)); - + return new MqttServer(serverAdapters, DefaultLogger.CreateChildLogger()); } } diff --git a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs index 6ffdd2b..ab1d38c 100644 --- a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs @@ -1,12 +1,12 @@ -using System.Collections.Generic; +using MQTTnet.Diagnostics; +using System.Collections.Generic; using System.Threading.Tasks; -using MQTTnet.Diagnostics; namespace MQTTnet.Server { public interface IMqttRetainedMessagesManager { - Task Start(IMqttServerOptions options, IMqttNetChildLogger logger); + Task Start(IMqttServerOptions options, IMqttNetLogger logger); Task LoadMessagesAsync(); diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index c9e2553..a21873b 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; @@ -12,6 +8,10 @@ using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Server { @@ -25,7 +25,7 @@ namespace MQTTnet.Server private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; private readonly MqttClientSessionsManager _sessionsManager; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private readonly IMqttServerOptions _serverOptions; private readonly IMqttChannelAdapter _channelAdapter; @@ -49,7 +49,7 @@ namespace MQTTnet.Server IMqttServerOptions serverOptions, MqttClientSessionsManager sessionsManager, IMqttRetainedMessagesManager retainedMessagesManager, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index deefe4c..9d17552 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -1,9 +1,9 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Internal; +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Internal; namespace MQTTnet.Server { @@ -13,15 +13,15 @@ namespace MQTTnet.Server private readonly string _clientId; private readonly Func _keepAliveElapsedCallback; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private bool _isPaused; - public MqttClientKeepAliveMonitor(string clientId, Func keepAliveElapsedCallback, IMqttNetChildLogger logger) + public MqttClientKeepAliveMonitor(string clientId, Func keepAliveElapsedCallback, IMqttNetLogger logger) { _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); _keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback)); - + if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor)); } @@ -32,7 +32,7 @@ namespace MQTTnet.Server { return; } - + Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger); } diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index d097b9f..e0d2175 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -1,18 +1,18 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Server.Status; +using System; using System.Collections.Generic; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Server.Status; namespace MQTTnet.Server { public class MqttClientSession { - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private readonly DateTime _createdTimestamp = DateTime.UtcNow; - public MqttClientSession(string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetChildLogger logger) + public MqttClientSession(string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttNetLogger logger) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); Items = items ?? throw new ArgumentNullException(nameof(items)); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 28c163d..dcbf5a4 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -1,15 +1,15 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Adapter; +using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Server { @@ -27,14 +27,14 @@ namespace MQTTnet.Server private readonly IMqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttServerOptions _options; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; public MqttClientSessionsManager( IMqttServerOptions options, IMqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, MqttServerEventDispatcher eventDispatcher, - IMqttNetChildLogger logger) + IMqttNetLogger logger) { _cancellationToken = cancellationToken; diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs index f4ebe48..f6a994b 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs @@ -1,10 +1,10 @@ -using System; +using MQTTnet.Diagnostics; +using MQTTnet.Implementations; +using MQTTnet.Internal; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using MQTTnet.Diagnostics; -using MQTTnet.Implementations; -using MQTTnet.Internal; namespace MQTTnet.Server { @@ -14,10 +14,10 @@ namespace MQTTnet.Server private readonly AsyncLock _messagesLock = new AsyncLock(); private readonly Dictionary _messages = new Dictionary(); - private IMqttNetChildLogger _logger; + private IMqttNetLogger _logger; private IMqttServerOptions _options; - public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger) + public Task Start(IMqttServerOptions options, IMqttNetLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 4c5ab62..0fa6b4c 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -17,13 +17,13 @@ namespace MQTTnet.Server { private readonly MqttServerEventDispatcher _eventDispatcher; private readonly ICollection _adapters; - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; private MqttClientSessionsManager _clientSessionsManager; private IMqttRetainedMessagesManager _retainedMessagesManager; private CancellationTokenSource _cancellationTokenSource; - public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger) + public MqttServer(IEnumerable adapters, IMqttNetLogger logger) { if (adapters == null) throw new ArgumentNullException(nameof(adapters)); _adapters = adapters.ToList(); diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index e6e608a..3eb6b85 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -1,15 +1,15 @@ -using System; -using System.Threading.Tasks; -using MQTTnet.Client.Receiving; +using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; +using System; +using System.Threading.Tasks; namespace MQTTnet.Server { public class MqttServerEventDispatcher { - private readonly IMqttNetChildLogger _logger; + private readonly IMqttNetLogger _logger; - public MqttServerEventDispatcher(IMqttNetChildLogger logger) + public MqttServerEventDispatcher(IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } diff --git a/Source/MQTTnet/TopicFilter.cs b/Source/MQTTnet/TopicFilter.cs index 3b62e7e..b086b48 100644 --- a/Source/MQTTnet/TopicFilter.cs +++ b/Source/MQTTnet/TopicFilter.cs @@ -2,6 +2,7 @@ namespace MQTTnet { + // TODO: Consider renaming to "MqttTopicFilter" public class TopicFilter { public string Topic { get; set; } @@ -26,16 +27,16 @@ namespace MQTTnet public override string ToString() { return string.Concat( - "TopicFilter: [Topic=", + "TopicFilter: [Topic=", Topic, - "] [QualityOfServiceLevel=", + "] [QualityOfServiceLevel=", QualityOfServiceLevel, - "] [NoLocal=", - NoLocal, - "] [RetainAsPublished=", - RetainAsPublished, - "] [RetainHandling=", - RetainHandling, + "] [NoLocal=", + NoLocal, + "] [RetainAsPublished=", + RetainAsPublished, + "] [RetainHandling=", + RetainHandling, "]"); } } diff --git a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs index cfc88d4..9be84bd 100644 --- a/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/LoggerBenchmark.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Benchmarks public class LoggerBenchmark { private IMqttNetLogger _logger; - private IMqttNetChildLogger _childLogger; + private IMqttNetLogger _childLogger; private bool _useHandler; [GlobalSetup] diff --git a/Tests/MQTTnet.Benchmarks/Program.cs b/Tests/MQTTnet.Benchmarks/Program.cs index 280416d..760cb52 100644 --- a/Tests/MQTTnet.Benchmarks/Program.cs +++ b/Tests/MQTTnet.Benchmarks/Program.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Benchmarks { public static void Main(string[] args) { - Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkInfoProvider.TargetFramework}"); + Console.WriteLine($"MQTTnet - BenchmarkApp.{TargetFrameworkProvider.TargetFramework}"); Console.WriteLine("1 = MessageProcessingBenchmark"); Console.WriteLine("2 = SerializerBenchmark"); Console.WriteLine("3 = LoggerBenchmark"); diff --git a/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs new file mode 100644 index 0000000..05440de --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs @@ -0,0 +1,111 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client.Options; +using MQTTnet.LowLevelClient; +using MQTTnet.Packets; +using MQTTnet.Protocol; +using MQTTnet.Tests.Mockups; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Tests +{ + [TestClass] + public class LowLevelMqttClient_Tests + { + public TestContext TestContext { get; set; } + + [TestMethod] + public async Task Connect_And_Disconnect() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + await lowLevelClient.DisconnectAsync(CancellationToken.None); + } + } + + [TestMethod] + public async Task Authenticate() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + var receivedPacket = await Authenticate(lowLevelClient).ConfigureAwait(false); + + await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); + + Assert.IsNotNull(receivedPacket); + Assert.AreEqual(MqttConnectReturnCode.ConnectionAccepted, receivedPacket.ReturnCode); + } + } + + [TestMethod] + public async Task Subscribe() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var server = await testEnvironment.StartServerAsync(); + + var factory = new MqttFactory(); + var lowLevelClient = factory.CreateLowLevelMqttClient(); + + await lowLevelClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build(), CancellationToken.None); + + await Authenticate(lowLevelClient).ConfigureAwait(false); + + var receivedPacket = await Subscribe(lowLevelClient, "a").ConfigureAwait(false); + + await lowLevelClient.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); + + Assert.IsNotNull(receivedPacket); + Assert.AreEqual(MqttSubscribeReturnCode.SuccessMaximumQoS0, receivedPacket.ReturnCodes[0]); + } + } + + async Task Authenticate(ILowLevelMqttClient client) + { + await client.SendAsync(new MqttConnectPacket() + { + CleanSession = true, + ClientId = TestContext.TestName, + Username = "user", + Password = Encoding.UTF8.GetBytes("pass") + }, + CancellationToken.None).ConfigureAwait(false); + + return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttConnAckPacket; + } + + async Task Subscribe(ILowLevelMqttClient client, string topic) + { + await client.SendAsync(new MqttSubscribePacket + { + PacketIdentifier = 1, + TopicFilters = new List + { + new TopicFilter + { + Topic = topic + } + } + }, + CancellationToken.None).ConfigureAwait(false); + + return await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false) as MqttSubAckPacket; + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs index 2500a6f..103142e 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestClientWrapper.cs @@ -1,6 +1,4 @@ -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; @@ -10,10 +8,12 @@ using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Client.Subscribing; using MQTTnet.Client.Unsubscribing; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestClientWrapper : IMqttClient + public sealed class TestClientWrapper : IMqttClient { public TestClientWrapper(IMqttClient implementation, TestContext testContext) { @@ -22,6 +22,7 @@ namespace MQTTnet.Tests.Mockups } public IMqttClient Implementation { get; } + public TestContext TestContext { get; } public bool IsConnected => Implementation.IsConnected; @@ -34,28 +35,32 @@ namespace MQTTnet.Tests.Mockups public Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken) { - switch (options) + if (TestContext != null) { - case MqttClientOptionsBuilder builder: - { - var existingClientId = builder.Build().ClientId; - if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + switch (options) + { + case MqttClientOptionsBuilder builder: { - builder.WithClientId(TestContext.TestName + existingClientId); + var existingClientId = builder.Build().ClientId; + if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + { + builder.WithClientId(TestContext.TestName + existingClientId); + } + + break; } - } - break; - case MqttClientOptions op: - { - var existingClientId = op.ClientId; - if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + + case MqttClientOptions op: { - op.ClientId = TestContext.TestName + existingClientId; + var existingClientId = op.ClientId; + if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName)) + { + op.ClientId = TestContext.TestName + existingClientId; + } + + break; } - } - break; - default: - break; + } } return Implementation.ConnectAsync(options, cancellationToken); @@ -81,7 +86,7 @@ namespace MQTTnet.Tests.Mockups return Implementation.SendExtendedAuthenticationExchangeDataAsync(data, cancellationToken); } - public Task SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken) + public Task SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken) { return Implementation.SubscribeAsync(options, cancellationToken); } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index 7f2dcac..be99534 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -1,27 +1,27 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Client.Options; using MQTTnet.Diagnostics; using MQTTnet.Internal; using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestEnvironment : Disposable + public sealed class TestEnvironment : Disposable { - private readonly MqttFactory _mqttFactory = new MqttFactory(); - private readonly List _clients = new List(); - private readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server"); - private readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client"); + readonly MqttFactory _mqttFactory = new MqttFactory(); + readonly List _clients = new List(); + readonly IMqttNetLogger _serverLogger = new MqttNetLogger("server"); + readonly IMqttNetLogger _clientLogger = new MqttNetLogger("client"); - private readonly List _serverErrors = new List(); - private readonly List _clientErrors = new List(); + readonly List _serverErrors = new List(); + readonly List _clientErrors = new List(); - private readonly List _exceptions = new List(); + readonly List _exceptions = new List(); public IMqttServer Server { get; private set; } @@ -37,36 +37,42 @@ namespace MQTTnet.Tests.Mockups public TestContext TestContext { get; } + public TestEnvironment() : this(null) + { + } + public TestEnvironment(TestContext testContext) { + TestContext = testContext; + _serverLogger.LogMessagePublished += (s, e) => { - if (e.TraceMessage.Level == MqttNetLogLevel.Error) + if (e.LogMessage.Level == MqttNetLogLevel.Error) { lock (_serverErrors) { - _serverErrors.Add(e.TraceMessage.ToString()); + _serverErrors.Add(e.LogMessage.ToString()); } } }; _clientLogger.LogMessagePublished += (s, e) => { - lock (_clientErrors) + if (e.LogMessage.Level == MqttNetLogLevel.Error) { - if (e.TraceMessage.Level == MqttNetLogLevel.Error) + lock (_clientErrors) { - _clientErrors.Add(e.TraceMessage.ToString()); + _clientErrors.Add(e.LogMessage.ToString()); } } }; - TestContext = testContext; } public IMqttClient CreateClient() { var client = _mqttFactory.CreateMqttClient(_clientLogger); _clients.Add(client); + return new TestClientWrapper(client, TestContext); } @@ -90,15 +96,17 @@ namespace MQTTnet.Tests.Mockups public Task ConnectClientAsync() { - return ConnectClientAsync(new MqttClientOptionsBuilder() ); + return ConnectClientAsync(new MqttClientOptionsBuilder()); } public async Task ConnectClientAsync(MqttClientOptionsBuilder options) { if (options == null) throw new ArgumentNullException(nameof(options)); + options = options.WithTcpServer("localhost", ServerPort); + var client = CreateClient(); - await client.ConnectAsync(options.WithTcpServer("localhost", ServerPort).Build()); + await client.ConnectAsync(options.Build()); return client; } @@ -150,6 +158,7 @@ namespace MQTTnet.Tests.Mockups throw new Exception($"{_exceptions.Count} exceptions tracked.\r\n" + string.Join(Environment.NewLine, _exceptions)); } } + base.Dispose(disposing); } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs index 79607d9..46471f8 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs @@ -1,20 +1,15 @@ -using System; -using MQTTnet.Diagnostics; +using MQTTnet.Diagnostics; +using System; namespace MQTTnet.Tests.Mockups { - public class TestLogger : IMqttNetLogger, IMqttNetChildLogger + public class TestLogger : IMqttNetLogger { public event EventHandler LogMessagePublished; - IMqttNetChildLogger IMqttNetLogger.CreateChildLogger(string source) + public IMqttNetLogger CreateChildLogger(string source = null) { - return new MqttNetChildLogger(this, source); - } - - IMqttNetChildLogger IMqttNetChildLogger.CreateChildLogger(string source) - { - return new MqttNetChildLogger(this, source); + return new TestLogger(); } public void Verbose(string message, params object[] parameters) @@ -36,5 +31,10 @@ namespace MQTTnet.Tests.Mockups public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) { } + + public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception) + { + throw new NotImplementedException(); + } } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs index ff47f71..95fbdfe 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapterFactory.cs @@ -12,8 +12,8 @@ namespace MQTTnet.Tests.Mockups { _adapter = adapter; } - - public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) { return _adapter; } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs index f990197..79c9739 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestServerWrapper.cs @@ -1,16 +1,16 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Server; using MQTTnet.Server.Status; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests.Mockups { - public class TestServerWrapper : IMqttServer + public sealed class TestServerWrapper : IMqttServer { public TestServerWrapper(IMqttServer implementation, TestContext testContext, TestEnvironment testEnvironment) { @@ -60,22 +60,29 @@ namespace MQTTnet.Tests.Mockups public Task StartAsync(IMqttServerOptions options) { - switch (options) + if (TestContext != null) { - case MqttServerOptionsBuilder builder: - if (builder.Build().ConnectionValidator == null) - { - builder.WithConnectionValidator(ConnectionValidator); - } - break; - case MqttServerOptions op: - if (op.ConnectionValidator == null) - { - op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator); - } - break; - default: - break; + switch (options) + { + case MqttServerOptionsBuilder builder: + { + if (builder.Build().ConnectionValidator == null) + { + builder.WithConnectionValidator(ConnectionValidator); + } + + break; + } + case MqttServerOptions op: + { + if (op.ConnectionValidator == null) + { + op.ConnectionValidator = new MqttServerConnectionValidatorDelegate(ConnectionValidator); + } + + break; + } + } } return Implementation.StartAsync(options); diff --git a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs index eb7aba3..28f2511 100644 --- a/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttFactory_Tests.cs @@ -17,22 +17,23 @@ namespace MQTTnet.Tests //This test compares //1. correct logID - string logId = "logId"; + var logId = "logId"; string invalidLogId = null; //2. if the total log calls are the same for global and local - int globalLogCount = 0; - int localLogCount = 0; + var globalLogCount = 0; + var localLogCount = 0; - MqttNetLogger logger = new MqttNetLogger(logId); + var logger = new MqttNetLogger(logId); //we have a theoretical bug here if a concurrent test is also logging var globalLog = new EventHandler((s, e) => { - if (logId != e.TraceMessage.LogId) + if (logId != e.LogMessage.LogId) { - invalidLogId = e.TraceMessage.LogId; + invalidLogId = e.LogMessage.LogId; } + Interlocked.Increment(ref globalLogCount); }); @@ -40,10 +41,11 @@ namespace MQTTnet.Tests logger.LogMessagePublished += (s, e) => { - if (logId != e.TraceMessage.LogId) + if (logId != e.LogMessage.LogId) { - invalidLogId = e.TraceMessage.LogId; + invalidLogId = e.LogMessage.LogId; } + Interlocked.Increment(ref localLogCount); }; diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 3ebdaa6..7d81eca 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -1,11 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; @@ -17,6 +10,13 @@ using MQTTnet.Implementations; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Tests.Mockups; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests { @@ -54,7 +54,7 @@ namespace MQTTnet.Tests MqttQualityOfServiceLevel.AtMostOnce, "A/B/C", MqttQualityOfServiceLevel.AtMostOnce, - 1, + 1, TestContext); } @@ -1016,7 +1016,7 @@ namespace MQTTnet.Tests [TestMethod] public async Task Same_Client_Id_Connect_Disconnect_Event_Order() { - using (var testEnvironment = new TestEnvironment(TestContext)) + using (var testEnvironment = new TestEnvironment()) { var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()); @@ -1038,11 +1038,11 @@ namespace MQTTnet.Tests } }); - var clientOptions = new MqttClientOptionsBuilder() - .WithClientId("same_id"); + var clientOptionsBuilder = new MqttClientOptionsBuilder() + .WithClientId(Guid.NewGuid().ToString()); // c - var c1 = await testEnvironment.ConnectClientAsync(clientOptions); + var c1 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder); await Task.Delay(500); @@ -1050,7 +1050,7 @@ namespace MQTTnet.Tests Assert.AreEqual("c", flow); // dc - var c2 = await testEnvironment.ConnectClientAsync(clientOptions); + var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder); c2.UseApplicationMessageReceivedHandler(_ => { @@ -1058,8 +1058,8 @@ namespace MQTTnet.Tests { events.Add("r"); } - }); + c2.SubscribeAsync("topic").Wait(); await Task.Delay(500); @@ -1080,7 +1080,7 @@ namespace MQTTnet.Tests Assert.AreEqual(false, c1.IsConnected); await c1.DisconnectAsync(); - Assert.AreEqual (false, c1.IsConnected); + Assert.AreEqual(false, c1.IsConnected); await Task.Delay(500); @@ -1141,7 +1141,7 @@ namespace MQTTnet.Tests await testEnvironment.ConnectClientAsync(); } } - + [TestMethod] public async Task Close_Idle_Connection() { @@ -1182,7 +1182,7 @@ namespace MQTTnet.Tests // forever. This is security related. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort); - + var buffer = Encoding.UTF8.GetBytes("Garbage"); client.Send(buffer, buffer.Length, SocketFlags.None); diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 777c6c0..9b2cbeb 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetCore { public static void Main() { - Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkInfoProvider.TargetFramework}"); + Console.WriteLine($"MQTTnet - TestApp.{TargetFrameworkProvider.TargetFramework}"); Console.WriteLine("1 = Start client"); Console.WriteLine("2 = Start server"); Console.WriteLine("3 = Start performance test");