From 11ade8c3df9d58c7cdb4ac2b5fd708df7fdc052b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 20 Oct 2017 23:30:58 +0200 Subject: [PATCH] Refactor tracing --- Build/MQTTnet.nuspec | 3 + .../MqttCommunicationAdapterFactory.cs | 7 +- .../Implementations/MqttServerAdapter.cs | 14 ++- .../MQTTnet.NetStandard/MqttClientFactory.cs | 9 +- .../MQTTnet.NetStandard/MqttServerFactory.cs | 6 +- .../MqttCommunicationAdapterFactory.cs | 7 +- .../Implementations/MqttServerAdapter.cs | 10 +- .../MqttClientFactory.cs | 9 +- .../MqttServerFactory.cs | 6 +- LICENSE | 2 +- .../MqttChannelCommunicationAdapter.cs | 8 +- MQTTnet.Core/Client/IMqttClientFactory.cs | 7 +- MQTTnet.Core/Client/MqttClient.cs | 42 ++++--- MQTTnet.Core/Client/MqttPacketDispatcher.cs | 8 +- .../Diagnostics/IMqttNetTraceHandler.cs | 9 ++ MQTTnet.Core/Diagnostics/MqttNetTrace.cs | 36 ++++-- .../Diagnostics/MqttNetTraceMessage.cs | 29 +++++ .../MqttNetTraceMessagePublishedEventArgs.cs | 18 +-- .../ManagedClient/ManagedMqttClient.cs | 46 +++++--- .../ManagedMqttClientMessagesManager.cs | 82 ------------- MQTTnet.Core/MqttApplicationMessageBuilder.cs | 108 ++++++++++++++++++ MQTTnet.Core/MqttApplicationMessageFactory.cs | 51 --------- MQTTnet.Core/Server/IMqttServerFactory.cs | 6 +- .../Server/MqttClientPendingMessagesQueue.cs | 12 +- .../MqttClientRetainedMessagesManager.cs | 12 +- MQTTnet.Core/Server/MqttClientSession.cs | 20 ++-- .../Server/MqttClientSessionsManager.cs | 18 +-- MQTTnet.Core/Server/MqttServer.cs | 14 ++- MQTTnet.sln | 1 + README.md | 28 ++++- .../MqttApplicationMessageBuilderTests.cs | 61 ++++++++++ .../MqttApplicationMessageFactoryTests.cs | 30 ----- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 21 ++-- .../TestMqttServerAdapter.cs | 3 +- Tests/MQTTnet.TestApp.NetCore/Program.cs | 69 +++++++++-- .../MainPage.xaml.cs | 6 +- 36 files changed, 497 insertions(+), 321 deletions(-) create mode 100644 MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs create mode 100644 MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs delete mode 100644 MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs create mode 100644 MQTTnet.Core/MqttApplicationMessageBuilder.cs delete mode 100644 MQTTnet.Core/MqttApplicationMessageFactory.cs create mode 100644 Tests/MQTTnet.Core.Tests/MqttApplicationMessageBuilderTests.cs delete mode 100644 Tests/MQTTnet.Core.Tests/MqttApplicationMessageFactoryTests.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index ffc3d7f..128f040 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,6 +11,9 @@ false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). * [Core] Merged the .NET Framwork and netstandard projects (Thanks to @JanEggers) +* [Core] Migrated the trace to a non-static approach (Breaking Change!) +* [Core] Added a builder for application messages using a fluent API +* [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta) Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs index 05e6fb6..0654e27 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs @@ -1,6 +1,7 @@ using System; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Serializer; namespace MQTTnet.Implementations @@ -13,12 +14,14 @@ namespace MQTTnet.Implementations if (options is MqttClientTcpOptions tcpOptions) { - return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + var trace = new MqttNetTrace(); + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); } if (options is MqttClientWebSocketOptions webSocketOptions) { - return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + var trace = new MqttNetTrace(); + return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); } throw new NotSupportedException(); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index b035d89..4b0c178 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -15,11 +15,17 @@ namespace MQTTnet.Implementations { public class MqttServerAdapter : IMqttServerAdapter, IDisposable { + private readonly MqttNetTrace _trace; private CancellationTokenSource _cancellationTokenSource; private Socket _defaultEndpointSocket; private Socket _tlsEndpointSocket; private X509Certificate2 _tlsCertificate; + public MqttServerAdapter(MqttNetTrace trace) + { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + } + public event EventHandler ClientAccepted; public Task StartAsync(MqttServerOptions options) @@ -94,12 +100,12 @@ namespace MQTTnet.Implementations #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _trace); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); + _trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); @@ -122,12 +128,12 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _trace); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); + _trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); //excessive CPU consumed if in endless loop of socket errors await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 0976601..fd5da68 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -1,4 +1,5 @@ using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.ManagedClient; using MQTTnet.Implementations; @@ -6,14 +7,14 @@ namespace MQTTnet { public class MqttClientFactory : IMqttClientFactory { - public IMqttClient CreateMqttClient() + public IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null) { - return new MqttClient(new MqttCommunicationAdapterFactory()); + return new MqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); } - public ManagedMqttClient CreateManagedMqttClient() + public ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null) { - return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); + return new ManagedMqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace()); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs index 70e14f3..468b8e0 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Server; using MQTTnet.Implementations; @@ -8,11 +9,12 @@ namespace MQTTnet { public class MqttServerFactory : IMqttServerFactory { - public IMqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null) { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttServer(options, new List { new MqttServerAdapter() }); + var trace = new MqttNetTrace(traceHandler); + return new MqttServer(options, new List { new MqttServerAdapter(trace) }, trace); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs index 05e6fb6..0654e27 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs @@ -1,6 +1,7 @@ using System; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Serializer; namespace MQTTnet.Implementations @@ -13,12 +14,14 @@ namespace MQTTnet.Implementations if (options is MqttClientTcpOptions tcpOptions) { - return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + var trace = new MqttNetTrace(); + return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); } if (options is MqttClientWebSocketOptions webSocketOptions) { - return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); + var trace = new MqttNetTrace(); + return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); } throw new NotSupportedException(); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs index f33dd59..f596318 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs @@ -10,8 +10,14 @@ namespace MQTTnet.Implementations { public class MqttServerAdapter : IMqttServerAdapter, IDisposable { + private readonly MqttNetTrace _trace; private StreamSocketListener _defaultEndpointSocket; + public MqttServerAdapter(MqttNetTrace trace) + { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + } + public event EventHandler ClientAccepted; public async Task StartAsync(MqttServerOptions options) @@ -50,12 +56,12 @@ namespace MQTTnet.Implementations { try { - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer(), _trace); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); + _trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 0976601..9a254f2 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -1,4 +1,5 @@ using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.ManagedClient; using MQTTnet.Implementations; @@ -6,14 +7,14 @@ namespace MQTTnet { public class MqttClientFactory : IMqttClientFactory { - public IMqttClient CreateMqttClient() + public IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null) { - return new MqttClient(new MqttCommunicationAdapterFactory()); + return new MqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); } - public ManagedMqttClient CreateManagedMqttClient() + public ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null) { - return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); + return new ManagedMqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs index 70e14f3..468b8e0 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Server; using MQTTnet.Implementations; @@ -8,11 +9,12 @@ namespace MQTTnet { public class MqttServerFactory : IMqttServerFactory { - public IMqttServer CreateMqttServer(MqttServerOptions options) + public IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null) { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttServer(options, new List { new MqttServerAdapter() }); + var trace = new MqttNetTrace(traceHandler); + return new MqttServer(options, new List { new MqttServerAdapter(trace) }, trace); } } } diff --git a/LICENSE b/LICENSE index 14484d8..7d84e2b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 Christian +Copyright (c) 2017 Christian Kratky Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 7fae37b..2bed87d 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -14,11 +14,13 @@ namespace MQTTnet.Core.Adapter { public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { + private readonly MqttNetTrace _trace; private readonly IMqttCommunicationChannel _channel; private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); - public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) + public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _channel = channel ?? throw new ArgumentNullException(nameof(channel)); PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); } @@ -94,7 +96,7 @@ namespace MQTTnet.Core.Adapter continue; } - MqttNetTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); + _trace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); var writeBuffer = PacketSerializer.Serialize(packet); await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false); @@ -160,7 +162,7 @@ namespace MQTTnet.Core.Adapter throw new MqttProtocolViolationException("Received malformed packet."); } - MqttNetTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); + _trace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); return packet; } catch (TaskCanceledException) diff --git a/MQTTnet.Core/Client/IMqttClientFactory.cs b/MQTTnet.Core/Client/IMqttClientFactory.cs index 7547906..e63047a 100644 --- a/MQTTnet.Core/Client/IMqttClientFactory.cs +++ b/MQTTnet.Core/Client/IMqttClientFactory.cs @@ -1,11 +1,12 @@ -using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.ManagedClient; namespace MQTTnet.Core.Client { public interface IMqttClientFactory { - IMqttClient CreateMqttClient(); + IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null); - ManagedMqttClient CreateManagedMqttClient(); + ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null); } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 5385790..4e595d7 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -15,8 +15,9 @@ namespace MQTTnet.Core.Client public class MqttClient : IMqttClient { private readonly HashSet _unacknowledgedPublishPackets = new HashSet(); - private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); + private readonly MqttPacketDispatcher _packetDispatcher; private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; + private readonly MqttNetTrace _trace; private IMqttClientOptions _options; private bool _isReceivingPackets; @@ -24,9 +25,12 @@ namespace MQTTnet.Core.Client private CancellationTokenSource _cancellationTokenSource; private IMqttCommunicationAdapter _adapter; - public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) + public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); + + _packetDispatcher = new MqttPacketDispatcher(trace); } public event EventHandler Connected; @@ -49,15 +53,15 @@ namespace MQTTnet.Core.Client _packetDispatcher.Reset(); _adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); - - MqttNetTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); + + _trace.Verbose(nameof(MqttClient), "Trying to connect with server."); await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Verbose(nameof(MqttClient), "Connection with server established."); + _trace.Verbose(nameof(MqttClient), "Connection with server established."); await SetupIncomingPacketProcessingAsync(); await AuthenticateAsync(options.WillMessage); - MqttNetTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); + _trace.Verbose(nameof(MqttClient), "MQTT connection with server established."); if (_options.KeepAlivePeriod != TimeSpan.Zero) { @@ -239,11 +243,11 @@ namespace MQTTnet.Core.Client try { await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); - MqttNetTrace.Information(nameof(MqttClient), "Disconnected from adapter."); + _trace.Information(nameof(MqttClient), "Disconnected from adapter."); } catch (Exception exception) { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); + _trace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); } finally { @@ -255,7 +259,7 @@ namespace MQTTnet.Core.Client { try { - MqttNetTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); + _trace.Information(nameof(MqttClient), "Received <<< {0}", packet); if (packet is MqttPingReqPacket) { @@ -285,7 +289,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); + _trace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); } } @@ -298,7 +302,7 @@ namespace MQTTnet.Core.Client } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); + _trace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); } } @@ -362,7 +366,7 @@ namespace MQTTnet.Core.Client private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) { - MqttNetTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); + _trace.Information(nameof(MqttClient), "Start sending keep alive packets."); try { @@ -387,23 +391,23 @@ namespace MQTTnet.Core.Client return; } - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); + _trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - MqttNetTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); + _trace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - MqttNetTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); + _trace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); } } private async Task ReceivePackets(CancellationToken cancellationToken) { - MqttNetTrace.Information(nameof(MqttClient), "Start receiving packets."); + _trace.Information(nameof(MqttClient), "Start receiving packets."); try { @@ -430,17 +434,17 @@ namespace MQTTnet.Core.Client return; } - MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); + _trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); + _trace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); await DisconnectInternalAsync().ConfigureAwait(false); } finally { - MqttNetTrace.Information(nameof(MqttClient), "Stopped receiving packets."); + _trace.Information(nameof(MqttClient), "Stopped receiving packets."); } } diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index 484012e..40902db 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -12,6 +12,12 @@ namespace MQTTnet.Core.Client { private readonly ConcurrentDictionary> _packetByResponseType = new ConcurrentDictionary>(); private readonly ConcurrentDictionary>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary>>(); + private readonly MqttNetTrace _trace; + + public MqttPacketDispatcher(MqttNetTrace trace) + { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + } public async Task WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout) { @@ -24,7 +30,7 @@ namespace MQTTnet.Core.Client } catch (MqttCommunicationTimedOutException) { - MqttNetTrace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet of type '{0}'.", responseType.Name); + _trace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet of type '{0}'.", responseType.Name); throw; } finally diff --git a/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs b/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs new file mode 100644 index 0000000..969cce0 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/IMqttNetTraceHandler.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Core.Diagnostics +{ + public interface IMqttNetTraceHandler + { + bool IsEnabled { get; } + + void HandleTraceMessage(MqttNetTraceMessage traceMessage); + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs index 4ed4619..35a828c 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetTrace.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetTrace.cs @@ -2,48 +2,62 @@ namespace MQTTnet.Core.Diagnostics { - public static class MqttNetTrace + public sealed class MqttNetTrace : IMqttNetTraceHandler { + private readonly IMqttNetTraceHandler _traceHandler; + + public MqttNetTrace(IMqttNetTraceHandler traceHandler = null) + { + _traceHandler = traceHandler ?? this; + } + public static event EventHandler TraceMessagePublished; - public static void Verbose(string source, string message, params object[] parameters) + public bool IsEnabled => TraceMessagePublished != null; + + public void Verbose(string source, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Verbose, null, message, parameters); } - public static void Information(string source, string message, params object[] parameters) + public void Information(string source, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Information, null, message, parameters); } - public static void Warning(string source, string message, params object[] parameters) + public void Warning(string source, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Warning, null, message, parameters); } - public static void Warning(string source, Exception exception, string message, params object[] parameters) + public void Warning(string source, Exception exception, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Warning, exception, message, parameters); } - public static void Error(string source, string message, params object[] parameters) + public void Error(string source, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Error, null, message, parameters); } - public static void Error(string source, Exception exception, string message, params object[] parameters) + public void Error(string source, Exception exception, string message, params object[] parameters) { Publish(source, MqttNetTraceLevel.Error, exception, message, parameters); } - private static void Publish(string source, MqttNetTraceLevel traceLevel, Exception exception, string message, params object[] parameters) + public void HandleTraceMessage(MqttNetTraceMessage mqttNetTraceMessage) + { + TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(mqttNetTraceMessage)); + } + + private void Publish(string source, MqttNetTraceLevel traceLevel, Exception exception, string message, params object[] parameters) { - var handler = TraceMessagePublished; - if (handler == null) + if (!_traceHandler.IsEnabled) { return; } + var now = DateTime.Now; if (parameters?.Length > 0) { try @@ -57,7 +71,7 @@ namespace MQTTnet.Core.Diagnostics } } - handler.Invoke(null, new MqttNetTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); + _traceHandler.HandleTraceMessage(new MqttNetTraceMessage(now, Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); } } } diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs new file mode 100644 index 0000000..3eb1b56 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/MqttNetTraceMessage.cs @@ -0,0 +1,29 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public sealed class MqttNetTraceMessage + { + public MqttNetTraceMessage(DateTime timestamp, int threadId, string source, MqttNetTraceLevel level, string message, Exception exception) + { + Timestamp = timestamp; + ThreadId = threadId; + Source = source; + Level = level; + Message = message; + Exception = exception; + } + + public DateTime Timestamp { get; } + + public int ThreadId { get; } + + public string Source { get; } + + public MqttNetTraceLevel Level { get; } + + public string Message { get; } + + public Exception Exception { get; } + } +} diff --git a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs b/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs index 05d5396..5b8f796 100644 --- a/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs +++ b/MQTTnet.Core/Diagnostics/MqttNetTraceMessagePublishedEventArgs.cs @@ -4,23 +4,11 @@ namespace MQTTnet.Core.Diagnostics { public sealed class MqttNetTraceMessagePublishedEventArgs : EventArgs { - public MqttNetTraceMessagePublishedEventArgs(int threadId, string source, MqttNetTraceLevel level, string message, Exception exception) + public MqttNetTraceMessagePublishedEventArgs(MqttNetTraceMessage traceMessage) { - ThreadId = threadId; - Source = source; - Level = level; - Message = message; - Exception = exception; + TraceMessage = traceMessage ?? throw new ArgumentNullException(nameof(traceMessage)); } - public int ThreadId { get; } - - public string Source { get; } - - public MqttNetTraceLevel Level { get; } - - public string Message { get; } - - public Exception Exception { get; } + public MqttNetTraceMessage TraceMessage { get; } } } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 1b86a75..989d117 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; @@ -14,36 +15,23 @@ namespace MQTTnet.Core.ManagedClient private readonly List _messageQueue = new List(); private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false); private readonly MqttClient _mqttClient; + private readonly MqttNetTrace _trace; private IManagedMqttClientOptions _options; - public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) + public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) { if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory)); + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); - _mqttClient = new MqttClient(communicationChannelFactory); + _mqttClient = new MqttClient(communicationChannelFactory, _trace); _mqttClient.Connected += OnConnected; _mqttClient.Disconnected += OnDisconnected; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; } - private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) - { - ApplicationMessageReceived?.Invoke(this, e); - } - - private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) - { - //Disconnected?.Invoke(this, e); - } - - private void OnConnected(object sender, EventArgs e) - { - Connected?.Invoke(this, e); - } - public event EventHandler Connected; - public event EventHandler Disconnected; + public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; public bool IsConnected => _mqttClient.IsConnected; @@ -51,7 +39,10 @@ namespace MQTTnet.Core.ManagedClient public void Start(IManagedMqttClientOptions options) { - + if (options == null) throw new ArgumentNullException(nameof(options)); + if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options)); + + } public void Stop() @@ -90,6 +81,8 @@ namespace MQTTnet.Core.ManagedClient public void Enqueue(IEnumerable applicationMessages) { + if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); + ThrowIfNotConnected(); _messageQueue.AddRange(applicationMessages); @@ -152,5 +145,20 @@ namespace MQTTnet.Core.ManagedClient // MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); //} } + + private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) + { + ApplicationMessageReceived?.Invoke(this, eventArgs); + } + + private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) + { + Disconnected?.Invoke(this, eventArgs); + } + + private void OnConnected(object sender, EventArgs eventArgs) + { + Connected?.Invoke(this, eventArgs); + } } } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs deleted file mode 100644 index 7b667ea..0000000 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs +++ /dev/null @@ -1,82 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using MQTTnet.Core.Diagnostics; - -namespace MQTTnet.Core.ManagedClient -{ - public class ManagedMqttClientMessagesManager - { - private readonly IList _persistedMessages = new List(); - private readonly ManagedMqttClientOptions _options; - - public ManagedMqttClientMessagesManager(ManagedMqttClientOptions options) - { - _options = options ?? throw new ArgumentNullException(nameof(options)); - } - - public async Task LoadMessagesAsync() - { - try - { - var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync(); - lock (_persistedMessages) - { - _persistedMessages.Clear(); - foreach (var persistentMessage in persistentMessages) - { - _persistedMessages.Add(persistentMessage); - } - } - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while loading persistent messages."); - } - } - - public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage) - { - if (applicationMessage != null) - { - lock (_persistedMessages) - { - _persistedMessages.Add(applicationMessage); - } - } - try - { - if (_options.Storage != null) - { - await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages); - } - } - catch (Exception exception) - { - MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while saving persistent messages."); - } - } - - public List GetMessages() - { - var persistedMessages = new List(); - lock (_persistedMessages) - { - foreach (var persistedMessage in _persistedMessages) - { - persistedMessages.Add(persistedMessage); - } - } - - return persistedMessages; - } - - public async Task Remove(MqttApplicationMessage message) - { - lock (_persistedMessages) - _persistedMessages.Remove(message); - - await SaveMessageAsync(null); - } - } -} diff --git a/MQTTnet.Core/MqttApplicationMessageBuilder.cs b/MQTTnet.Core/MqttApplicationMessageBuilder.cs new file mode 100644 index 0000000..9905b09 --- /dev/null +++ b/MQTTnet.Core/MqttApplicationMessageBuilder.cs @@ -0,0 +1,108 @@ +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core +{ + public class MqttApplicationMessageBuilder + { + private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + private string _topic; + private byte[] _payload; + private bool _retain; + + public MqttApplicationMessageBuilder WithTopic(string topic) + { + _topic = topic; + return this; + } + + public MqttApplicationMessageBuilder WithPayload(IEnumerable payload) + { + if (payload == null) + { + _payload = null; + return this; + } + + _payload = payload.ToArray(); + return this; + } + + public MqttApplicationMessageBuilder WithPayload(MemoryStream payload) + { + if (payload == null) + { + _payload = null; + return this; + } + + if (payload.Length == 0) + { + _payload = new byte[0]; + } + else + { + _payload = new byte[payload.Length - payload.Position]; + payload.Read(_payload, 0, _payload.Length); + } + + return this; + } + + public MqttApplicationMessageBuilder WithPayload(string payload) + { + if (payload == null) + { + _payload = null; + return this; + } + + _payload = string.IsNullOrEmpty(payload) ? new byte[0] : Encoding.UTF8.GetBytes(payload); + return this; + } + + public MqttApplicationMessageBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) + { + _qualityOfServiceLevel = qualityOfServiceLevel; + return this; + } + + public MqttApplicationMessageBuilder WithRetainFlag(bool value = true) + { + _retain = value; + return this; + } + + public MqttApplicationMessageBuilder WithAtLeastOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; + return this; + } + + public MqttApplicationMessageBuilder WithAtMostOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + return this; + } + + public MqttApplicationMessageBuilder WithExactlyOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; + return this; + } + + public MqttApplicationMessage Build() + { + if (string.IsNullOrEmpty(_topic)) + { + throw new MqttProtocolViolationException("Topic is not set."); + } + + return new MqttApplicationMessage(_topic, _payload ?? new byte[0], _qualityOfServiceLevel, _retain); + } + } +} diff --git a/MQTTnet.Core/MqttApplicationMessageFactory.cs b/MQTTnet.Core/MqttApplicationMessageFactory.cs deleted file mode 100644 index c5bbee5..0000000 --- a/MQTTnet.Core/MqttApplicationMessageFactory.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using System.Globalization; -using System.IO; -using System.Text; -using MQTTnet.Core.Protocol; - -namespace MQTTnet.Core -{ - public class MqttApplicationMessageFactory - { - public MqttApplicationMessage CreateApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) - { - if (topic == null) throw new ArgumentNullException(nameof(topic)); - - if (payload == null) - { - payload = new byte[0]; - } - - return new MqttApplicationMessage(topic, payload, qualityOfServiceLevel, retain); - } - - public MqttApplicationMessage CreateApplicationMessage(string topic, TPayload payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) - { - if (topic == null) throw new ArgumentNullException(nameof(topic)); - - var payloadString = Convert.ToString(payload, CultureInfo.InvariantCulture); - var payloadBuffer = string.IsNullOrEmpty(payloadString) ? new byte[0] : Encoding.UTF8.GetBytes(payloadString); - - return CreateApplicationMessage(topic, payloadBuffer, qualityOfServiceLevel, retain); - } - - public MqttApplicationMessage CreateApplicationMessage(string topic, Stream payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) - { - if (topic == null) throw new ArgumentNullException(nameof(topic)); - - byte[] payloadBuffer; - if (payload == null || payload.Length == 0) - { - payloadBuffer = new byte[0]; - } - else - { - payloadBuffer = new byte[payload.Length - payload.Position]; - payload.Read(payloadBuffer, 0, payloadBuffer.Length); - } - - return CreateApplicationMessage(topic, payloadBuffer, qualityOfServiceLevel, retain); - } - } -} diff --git a/MQTTnet.Core/Server/IMqttServerFactory.cs b/MQTTnet.Core/Server/IMqttServerFactory.cs index 4d33a92..865080b 100644 --- a/MQTTnet.Core/Server/IMqttServerFactory.cs +++ b/MQTTnet.Core/Server/IMqttServerFactory.cs @@ -1,7 +1,9 @@ -namespace MQTTnet.Core.Server +using MQTTnet.Core.Diagnostics; + +namespace MQTTnet.Core.Server { public interface IMqttServerFactory { - IMqttServer CreateMqttServer(MqttServerOptions options); + IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null); } } \ No newline at end of file diff --git a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs index 6571a75..551dc8e 100644 --- a/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs +++ b/MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs @@ -15,9 +15,11 @@ namespace MQTTnet.Core.Server private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttClientSession _session; private readonly MqttServerOptions _options; + private readonly MqttNetTrace _trace; - public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session) + public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _session = session ?? throw new ArgumentNullException(nameof(session)); _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -50,7 +52,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets."); + _trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets."); } } @@ -66,18 +68,18 @@ namespace MQTTnet.Core.Server { if (exception is MqttCommunicationTimedOutException) { - MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout."); + _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout."); } else if (exception is MqttCommunicationException) { - MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); + _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); } if (exception is OperationCanceledException) { } else { - MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); + _trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); } if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs index 77b49bf..fe95f01 100644 --- a/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs +++ b/MQTTnet.Core/Server/MqttClientRetainedMessagesManager.cs @@ -10,11 +10,13 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientRetainedMessagesManager { + private readonly MqttNetTrace _trace; private readonly Dictionary _retainedMessages = new Dictionary(); private readonly MqttServerOptions _options; - public MqttClientRetainedMessagesManager(MqttServerOptions options) + public MqttClientRetainedMessagesManager(MqttServerOptions options, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _options = options ?? throw new ArgumentNullException(nameof(options)); } @@ -39,7 +41,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while loading retained messages."); + _trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while loading retained messages."); } } @@ -53,12 +55,12 @@ namespace MQTTnet.Core.Server if (publishPacket.Payload?.Any() == false) { _retainedMessages.Remove(publishPacket.Topic); - MqttNetTrace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, publishPacket.Topic); + _trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, publishPacket.Topic); } else { _retainedMessages[publishPacket.Topic] = publishPacket; - MqttNetTrace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, publishPacket.Topic); + _trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, publishPacket.Topic); } allRetainedMessages = new List(_retainedMessages.Values); @@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while saving retained messages."); + _trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while saving retained messages."); } } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 37d168a..9b6cce5 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -20,17 +20,19 @@ namespace MQTTnet.Core.Server private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttServerOptions _options; + private readonly MqttNetTrace _trace; private IMqttCommunicationAdapter _adapter; private CancellationTokenSource _cancellationTokenSource; private MqttApplicationMessage _willMessage; - public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager) + public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); ClientId = clientId; _options = options ?? throw new ArgumentNullException(nameof(options)); _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager)); - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this); + _pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this, trace); } public string ClientId { get; } @@ -58,11 +60,11 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); } } @@ -79,7 +81,7 @@ namespace MQTTnet.Core.Server _adapter = null; - MqttNetTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); + _trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); } public void EnqueuePublishPacket(MqttPublishPacket publishPacket) @@ -92,7 +94,7 @@ namespace MQTTnet.Core.Server } _pendingMessagesQueue.Enqueue(publishPacket); - MqttNetTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); + _trace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); } public void Dispose() @@ -116,12 +118,12 @@ namespace MQTTnet.Core.Server } catch (MqttCommunicationException exception) { - MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); + _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); Stop(); } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); + _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); Stop(); } } @@ -163,7 +165,7 @@ namespace MQTTnet.Core.Server } else { - MqttNetTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); + _trace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); Stop(); } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 9c9fbfd..3a93d08 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -17,11 +17,13 @@ namespace MQTTnet.Core.Server { private readonly Dictionary _clientSessions = new Dictionary(); private readonly MqttServerOptions _options; + private readonly MqttNetTrace _trace; - public MqttClientSessionsManager(MqttServerOptions options) + public MqttClientSessionsManager(MqttServerOptions options, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _options = options ?? throw new ArgumentNullException(nameof(options)); - RetainedMessagesManager = new MqttClientRetainedMessagesManager(options); + RetainedMessagesManager = new MqttClientRetainedMessagesManager(options, trace); } public event EventHandler ApplicationMessageReceived; @@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttServer), exception, exception.Message); + _trace.Error(nameof(MqttServer), exception, exception.Message); } finally { @@ -124,7 +126,7 @@ namespace MQTTnet.Core.Server } catch (Exception exception) { - MqttNetTrace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); + _trace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); } lock (_clientSessions) @@ -158,11 +160,11 @@ namespace MQTTnet.Core.Server _clientSessions.Remove(connectPacket.ClientId); clientSession.Dispose(); clientSession = null; - MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); + _trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); } else { - MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); + _trace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); } } @@ -171,10 +173,10 @@ namespace MQTTnet.Core.Server { isExistingSession = false; - clientSession = new MqttClientSession(connectPacket.ClientId, _options, this); + clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _trace); _clientSessions[connectPacket.ClientId] = clientSession; - MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); + _trace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); } return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs index d1615d8..7b13d71 100644 --- a/MQTTnet.Core/Server/MqttServer.cs +++ b/MQTTnet.Core/Server/MqttServer.cs @@ -10,18 +10,20 @@ namespace MQTTnet.Core.Server { public sealed class MqttServer : IMqttServer { + private readonly MqttNetTrace _trace; private readonly MqttClientSessionsManager _clientSessionsManager; private readonly ICollection _adapters; private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; - public MqttServer(MqttServerOptions options, ICollection adapters) + public MqttServer(MqttServerOptions options, ICollection adapters, MqttNetTrace trace) { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _options = options ?? throw new ArgumentNullException(nameof(options)); _adapters = adapters ?? throw new ArgumentNullException(nameof(adapters)); - _clientSessionsManager = new MqttClientSessionsManager(options); + _clientSessionsManager = new MqttClientSessionsManager(options, trace); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); _clientSessionsManager.ClientConnected += OnClientConnected; _clientSessionsManager.ClientDisconnected += OnClientDisconnected; @@ -57,7 +59,7 @@ namespace MQTTnet.Core.Server await adapter.StartAsync(_options); } - MqttNetTrace.Information(nameof(MqttServer), "Started."); + _trace.Information(nameof(MqttServer), "Started."); } public async Task StopAsync() @@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server _clientSessionsManager.Clear(); - MqttNetTrace.Information(nameof(MqttServer), "Stopped."); + _trace.Information(nameof(MqttServer), "Stopped."); } private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) @@ -84,13 +86,13 @@ namespace MQTTnet.Core.Server private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) { - MqttNetTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); + _trace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); ClientConnected?.Invoke(this, eventArgs); } private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) { - MqttNetTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); + _trace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); ClientDisconnected?.Invoke(this, eventArgs); } } diff --git a/MQTTnet.sln b/MQTTnet.sln index 4d49d12..79e8b4d 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -25,6 +25,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" ProjectSection(SolutionItems) = preProject + LICENSE = LICENSE README.md = README.md EndProjectSection EndProject diff --git a/README.md b/README.md index 2ded79f..3d7be1e 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![BCH compliance](https://bettercodehub.com/edge/badge/chkr1011/MQTTnet?branch=master)](https://bettercodehub.com/) # MQTTnet -MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. +MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. # Features @@ -18,7 +18,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien * Performance optimized (processing ~27.000 messages / second)* * Interfaces included for mocking and testing * Access to internal trace messages -* Unit tested (62+ tests) +* Unit tested (67+ tests) \* Tested on local machine with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetFramework_. @@ -61,4 +61,26 @@ This library is used in the following projects: * MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) * Wirehome (Open Source Home Automation system for .NET, https://github.com/chkr1011/Wirehome) -If you use this library and want to see your project here please let me know. \ No newline at end of file +If you use this library and want to see your project here please let me know. + +# MIT License + +Copyright (c) 2017 Christian Kratky + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MqttApplicationMessageBuilderTests.cs b/Tests/MQTTnet.Core.Tests/MqttApplicationMessageBuilderTests.cs new file mode 100644 index 0000000..c6a7017 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/MqttApplicationMessageBuilderTests.cs @@ -0,0 +1,61 @@ +using System; +using System.IO; +using System.Text; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class MqttApplicationMessageBuilderTests + { + [TestMethod] + public void CreateApplicationMessage_TopicOnly() + { + var message = new MqttApplicationMessageBuilder().WithTopic("Abc").Build(); + Assert.AreEqual("Abc", message.Topic); + Assert.IsFalse(message.Retain); + Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); + } + + [TestMethod] + public void CreateApplicationMessage_TimeStampPayload() + { + var message = new MqttApplicationMessageBuilder().WithTopic("xyz").WithPayload(TimeSpan.FromSeconds(360).ToString()).Build(); + Assert.AreEqual("xyz", message.Topic); + Assert.IsFalse(message.Retain); + Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); + Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "00:06:00"); + } + + [TestMethod] + public void CreateApplicationMessage_StreamPayload() + { + var stream = new MemoryStream(Encoding.UTF8.GetBytes("xHello")) { Position = 1 }; + + var message = new MqttApplicationMessageBuilder().WithTopic("123").WithPayload(stream).Build(); + Assert.AreEqual("123", message.Topic); + Assert.IsFalse(message.Retain); + Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); + Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "Hello"); + } + + [TestMethod] + public void CreateApplicationMessage_Retained() + { + var message = new MqttApplicationMessageBuilder().WithTopic("lol").WithRetainFlag().Build(); + Assert.AreEqual("lol", message.Topic); + Assert.IsTrue(message.Retain); + Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); + } + + [TestMethod] + public void CreateApplicationMessage_QosLevel2() + { + var message = new MqttApplicationMessageBuilder().WithTopic("rofl").WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build(); + Assert.AreEqual("rofl", message.Topic); + Assert.IsTrue(message.Retain); + Assert.AreEqual(MqttQualityOfServiceLevel.ExactlyOnce, message.QualityOfServiceLevel); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MqttApplicationMessageFactoryTests.cs b/Tests/MQTTnet.Core.Tests/MqttApplicationMessageFactoryTests.cs deleted file mode 100644 index f0f464a..0000000 --- a/Tests/MQTTnet.Core.Tests/MqttApplicationMessageFactoryTests.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using System.Text; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Core.Protocol; - -namespace MQTTnet.Core.Tests -{ - [TestClass] - public class MqttApplicationMessageFactoryTests - { - [TestMethod] - public void CreateApplicationMessage_TopicOnly() - { - var message = new MqttApplicationMessageFactory().CreateApplicationMessage("Abc", MqttQualityOfServiceLevel.AtLeastOnce); - Assert.AreEqual("Abc", message.Topic); - Assert.IsFalse(message.Retain); - Assert.AreEqual(MqttQualityOfServiceLevel.AtLeastOnce, message.QualityOfServiceLevel); - } - - [TestMethod] - public void CreateApplicationMessage_TimeStampPayload() - { - var message = new MqttApplicationMessageFactory().CreateApplicationMessage("xyz", TimeSpan.FromSeconds(360)); - Assert.AreEqual("xyz", message.Topic); - Assert.IsFalse(message.Retain); - Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); - Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "00:06:00"); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 0e786df..63ef0bb 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; @@ -49,7 +50,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_WillMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); @@ -73,7 +74,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Unsubscribe() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -109,7 +110,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_Publish() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -132,7 +133,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_NoRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -155,7 +156,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_RetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -178,7 +179,7 @@ namespace MQTTnet.Core.Tests public async Task MqttServer_ClearRetainedMessage() { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -204,7 +205,7 @@ namespace MQTTnet.Core.Tests var storage = new TestStorage(); var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); @@ -213,7 +214,7 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); - s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }); + s = new MqttServer(new MqttServerOptions { Storage = storage }, new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c2 = await serverAdapter.ConnectTestClient(s, "c2"); @@ -235,7 +236,7 @@ namespace MQTTnet.Core.Tests public Task SaveRetainedMessagesAsync(IList messages) { _messages = messages; - return Task.FromResult(0); + return Task.CompletedTask; } public Task> LoadRetainedMessagesAsync() @@ -252,7 +253,7 @@ namespace MQTTnet.Core.Tests int expectedReceivedMessagesCount) { var serverAdapter = new TestMqttServerAdapter(); - var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }); + var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index 6cfa9bb..32fc2cc 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Server; using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; namespace MQTTnet.Core.Tests { @@ -17,7 +18,7 @@ namespace MQTTnet.Core.Tests adapterA.Partner = adapterB; adapterB.Partner = adapterA; - var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA)); + var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA), new MqttNetTrace()); var connected = WaitForClientToConnect(server, clientId); FireClientAcceptedEvent(adapterB); diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 88130cb..dab8235 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -45,10 +45,10 @@ namespace MQTTnet.TestApp.NetCore { MqttNetTrace.TraceMessagePublished += (s, e) => { - Console.WriteLine($">> [{DateTime.Now:O}] [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); - if (e.Exception != null) + Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); + if (e.TraceMessage.Exception != null) { - Console.WriteLine(e.Exception); + Console.WriteLine(e.TraceMessage.Exception); } }; @@ -134,10 +134,10 @@ namespace MQTTnet.TestApp.NetCore { MqttNetTrace.TraceMessagePublished += (s, e) => { - Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); - if (e.Exception != null) + Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); + if (e.TraceMessage.Exception != null) { - Console.WriteLine(e.Exception); + Console.WriteLine(e.TraceMessage.Exception); } }; @@ -162,11 +162,11 @@ namespace MQTTnet.TestApp.NetCore options.Storage = new RetainedMessageHandler(); - var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); - options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); - options.ConnectionBacklog = 5; - options.DefaultEndpointOptions.IsEnabled = true; - options.TlsEndpointOptions.IsEnabled = false; + //var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); + //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); + //options.ConnectionBacklog = 5; + //options.DefaultEndpointOptions.IsEnabled = true; + //options.TlsEndpointOptions.IsEnabled = false; var mqttServer = new MqttServerFactory().CreateMqttServer(options); mqttServer.ClientDisconnected += (s, e) => @@ -189,6 +189,51 @@ namespace MQTTnet.TestApp.NetCore Console.ReadLine(); return Task.FromResult(0); } + + // ReSharper disable once UnusedMember.Local + private static async void WikiCode() + { + { + var client = new MqttClientFactory().CreateMqttClient(new CustomTraceHandler("Client 1")); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("MyTopic") + .WithPayload("Hello World") + .WithExactlyOnceQoS() + .WithRetainFlag() + .Build(); + + await client.PublishAsync(message); + } + + { + var message = new MqttApplicationMessageBuilder() + .WithTopic("/MQTTnet/is/awesome") + .Build(); + } + } + } + + public class CustomTraceHandler : IMqttNetTraceHandler + { + private readonly string _clientId; + + public CustomTraceHandler(string clientId) + { + _clientId = clientId; + } + + public bool IsEnabled { get; } = true; + + public void HandleTraceMessage(MqttNetTraceMessage traceMessage) + { + // Client ID is added to the trace message. + Console.WriteLine($">> [{_clientId}] [{traceMessage.Timestamp:O}] [{traceMessage.ThreadId}] [{traceMessage.Source}] [{traceMessage.Level}]: {traceMessage.Message}"); + if (traceMessage.Exception != null) + { + Console.WriteLine(traceMessage.Exception); + } + } } public class RetainedMessageHandler : IMqttServerStorage @@ -213,7 +258,7 @@ namespace MQTTnet.TestApp.NetCore { retainedMessages = new List(); } - + return Task.FromResult(retainedMessages); } } diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index de795d5..8826e13 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -29,10 +29,10 @@ namespace MQTTnet.TestApp.UniversalWindows { await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.High, () => { - var text = $"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] [{e.Level}] [{e.Source}] [{e.ThreadId}] [{e.Message}]{Environment.NewLine}"; - if (e.Exception != null) + var text = $"[{e.TraceMessage.Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{e.TraceMessage.Level}] [{e.TraceMessage.Source}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Message}]{Environment.NewLine}"; + if (e.TraceMessage.Exception != null) { - text += $"{e.Exception}{Environment.NewLine}"; + text += $"{e.TraceMessage.Exception}{Environment.NewLine}"; } Trace.Text += text;