@@ -11,6 +11,9 @@ | |||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | |||
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | |||
<releaseNotes>* [Core] Merged the .NET Framwork and netstandard projects (Thanks to @JanEggers) | |||
* [Core] Migrated the trace to a non-static approach (Breaking Change!) | |||
* [Core] Added a builder for application messages using a fluent API | |||
* [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta) | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2017</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M</tags> | |||
@@ -1,6 +1,7 @@ | |||
using System; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Serializer; | |||
namespace MQTTnet.Implementations | |||
@@ -13,12 +14,14 @@ namespace MQTTnet.Implementations | |||
if (options is MqttClientTcpOptions tcpOptions) | |||
{ | |||
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); | |||
var trace = new MqttNetTrace(); | |||
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); | |||
} | |||
if (options is MqttClientWebSocketOptions webSocketOptions) | |||
{ | |||
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); | |||
var trace = new MqttNetTrace(); | |||
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); | |||
} | |||
throw new NotSupportedException(); | |||
@@ -15,11 +15,17 @@ namespace MQTTnet.Implementations | |||
{ | |||
public class MqttServerAdapter : IMqttServerAdapter, IDisposable | |||
{ | |||
private readonly MqttNetTrace _trace; | |||
private CancellationTokenSource _cancellationTokenSource; | |||
private Socket _defaultEndpointSocket; | |||
private Socket _tlsEndpointSocket; | |||
private X509Certificate2 _tlsCertificate; | |||
public MqttServerAdapter(MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
} | |||
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; | |||
public Task StartAsync(MqttServerOptions options) | |||
@@ -94,12 +100,12 @@ namespace MQTTnet.Implementations | |||
#else | |||
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); | |||
#endif | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _trace); | |||
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); | |||
_trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); | |||
//excessive CPU consumed if in endless loop of socket errors | |||
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | |||
@@ -122,12 +128,12 @@ namespace MQTTnet.Implementations | |||
var sslStream = new SslStream(new NetworkStream(clientSocket)); | |||
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _trace); | |||
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); | |||
_trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); | |||
//excessive CPU consumed if in endless loop of socket errors | |||
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | |||
@@ -1,4 +1,5 @@ | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.ManagedClient; | |||
using MQTTnet.Implementations; | |||
@@ -6,14 +7,14 @@ namespace MQTTnet | |||
{ | |||
public class MqttClientFactory : IMqttClientFactory | |||
{ | |||
public IMqttClient CreateMqttClient() | |||
public IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
return new MqttClient(new MqttCommunicationAdapterFactory()); | |||
return new MqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); | |||
} | |||
public ManagedMqttClient CreateManagedMqttClient() | |||
public ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); | |||
return new ManagedMqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace()); | |||
} | |||
} | |||
} |
@@ -1,6 +1,7 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Server; | |||
using MQTTnet.Implementations; | |||
@@ -8,11 +9,12 @@ namespace MQTTnet | |||
{ | |||
public class MqttServerFactory : IMqttServerFactory | |||
{ | |||
public IMqttServer CreateMqttServer(MqttServerOptions options) | |||
public IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
return new MqttServer(options, new List<IMqttServerAdapter> { new MqttServerAdapter() }); | |||
var trace = new MqttNetTrace(traceHandler); | |||
return new MqttServer(options, new List<IMqttServerAdapter> { new MqttServerAdapter(trace) }, trace); | |||
} | |||
} | |||
} |
@@ -1,6 +1,7 @@ | |||
using System; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Serializer; | |||
namespace MQTTnet.Implementations | |||
@@ -13,12 +14,14 @@ namespace MQTTnet.Implementations | |||
if (options is MqttClientTcpOptions tcpOptions) | |||
{ | |||
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); | |||
var trace = new MqttNetTrace(); | |||
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); | |||
} | |||
if (options is MqttClientWebSocketOptions webSocketOptions) | |||
{ | |||
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }); | |||
var trace = new MqttNetTrace(); | |||
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }, trace); | |||
} | |||
throw new NotSupportedException(); | |||
@@ -10,8 +10,14 @@ namespace MQTTnet.Implementations | |||
{ | |||
public class MqttServerAdapter : IMqttServerAdapter, IDisposable | |||
{ | |||
private readonly MqttNetTrace _trace; | |||
private StreamSocketListener _defaultEndpointSocket; | |||
public MqttServerAdapter(MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
} | |||
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; | |||
public async Task StartAsync(MqttServerOptions options) | |||
@@ -50,12 +56,12 @@ namespace MQTTnet.Implementations | |||
{ | |||
try | |||
{ | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); | |||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer(), _trace); | |||
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); | |||
_trace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); | |||
} | |||
} | |||
} |
@@ -1,4 +1,5 @@ | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.ManagedClient; | |||
using MQTTnet.Implementations; | |||
@@ -6,14 +7,14 @@ namespace MQTTnet | |||
{ | |||
public class MqttClientFactory : IMqttClientFactory | |||
{ | |||
public IMqttClient CreateMqttClient() | |||
public IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
return new MqttClient(new MqttCommunicationAdapterFactory()); | |||
return new MqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); | |||
} | |||
public ManagedMqttClient CreateManagedMqttClient() | |||
public ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
return new ManagedMqttClient(new MqttCommunicationAdapterFactory()); | |||
return new ManagedMqttClient(new MqttCommunicationAdapterFactory(), new MqttNetTrace(traceHandler)); | |||
} | |||
} | |||
} |
@@ -1,6 +1,7 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Server; | |||
using MQTTnet.Implementations; | |||
@@ -8,11 +9,12 @@ namespace MQTTnet | |||
{ | |||
public class MqttServerFactory : IMqttServerFactory | |||
{ | |||
public IMqttServer CreateMqttServer(MqttServerOptions options) | |||
public IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
return new MqttServer(options, new List<IMqttServerAdapter> { new MqttServerAdapter() }); | |||
var trace = new MqttNetTrace(traceHandler); | |||
return new MqttServer(options, new List<IMqttServerAdapter> { new MqttServerAdapter(trace) }, trace); | |||
} | |||
} | |||
} |
@@ -1,6 +1,6 @@ | |||
MIT License | |||
Copyright (c) 2017 Christian | |||
Copyright (c) 2017 Christian Kratky | |||
Permission is hereby granted, free of charge, to any person obtaining a copy | |||
of this software and associated documentation files (the "Software"), to deal | |||
@@ -14,11 +14,13 @@ namespace MQTTnet.Core.Adapter | |||
{ | |||
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter | |||
{ | |||
private readonly MqttNetTrace _trace; | |||
private readonly IMqttCommunicationChannel _channel; | |||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); | |||
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) | |||
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_channel = channel ?? throw new ArgumentNullException(nameof(channel)); | |||
PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); | |||
} | |||
@@ -94,7 +96,7 @@ namespace MQTTnet.Core.Adapter | |||
continue; | |||
} | |||
MqttNetTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); | |||
_trace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); | |||
var writeBuffer = PacketSerializer.Serialize(packet); | |||
await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false); | |||
@@ -160,7 +162,7 @@ namespace MQTTnet.Core.Adapter | |||
throw new MqttProtocolViolationException("Received malformed packet."); | |||
} | |||
MqttNetTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); | |||
_trace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet); | |||
return packet; | |||
} | |||
catch (TaskCanceledException) | |||
@@ -1,11 +1,12 @@ | |||
using MQTTnet.Core.ManagedClient; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.ManagedClient; | |||
namespace MQTTnet.Core.Client | |||
{ | |||
public interface IMqttClientFactory | |||
{ | |||
IMqttClient CreateMqttClient(); | |||
IMqttClient CreateMqttClient(IMqttNetTraceHandler traceHandler = null); | |||
ManagedMqttClient CreateManagedMqttClient(); | |||
ManagedMqttClient CreateManagedMqttClient(IMqttNetTraceHandler traceHandler = null); | |||
} | |||
} |
@@ -15,8 +15,9 @@ namespace MQTTnet.Core.Client | |||
public class MqttClient : IMqttClient | |||
{ | |||
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>(); | |||
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); | |||
private readonly MqttPacketDispatcher _packetDispatcher; | |||
private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory; | |||
private readonly MqttNetTrace _trace; | |||
private IMqttClientOptions _options; | |||
private bool _isReceivingPackets; | |||
@@ -24,9 +25,12 @@ namespace MQTTnet.Core.Client | |||
private CancellationTokenSource _cancellationTokenSource; | |||
private IMqttCommunicationAdapter _adapter; | |||
public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) | |||
public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_communicationChannelFactory = communicationChannelFactory ?? throw new ArgumentNullException(nameof(communicationChannelFactory)); | |||
_packetDispatcher = new MqttPacketDispatcher(trace); | |||
} | |||
public event EventHandler Connected; | |||
@@ -49,15 +53,15 @@ namespace MQTTnet.Core.Client | |||
_packetDispatcher.Reset(); | |||
_adapter = _communicationChannelFactory.CreateMqttCommunicationAdapter(options); | |||
MqttNetTrace.Verbose(nameof(MqttClient), "Trying to connect with server."); | |||
_trace.Verbose(nameof(MqttClient), "Trying to connect with server."); | |||
await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); | |||
MqttNetTrace.Verbose(nameof(MqttClient), "Connection with server established."); | |||
_trace.Verbose(nameof(MqttClient), "Connection with server established."); | |||
await SetupIncomingPacketProcessingAsync(); | |||
await AuthenticateAsync(options.WillMessage); | |||
MqttNetTrace.Verbose(nameof(MqttClient), "MQTT connection with server established."); | |||
_trace.Verbose(nameof(MqttClient), "MQTT connection with server established."); | |||
if (_options.KeepAlivePeriod != TimeSpan.Zero) | |||
{ | |||
@@ -239,11 +243,11 @@ namespace MQTTnet.Core.Client | |||
try | |||
{ | |||
await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false); | |||
MqttNetTrace.Information(nameof(MqttClient), "Disconnected from adapter."); | |||
_trace.Information(nameof(MqttClient), "Disconnected from adapter."); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); | |||
_trace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter."); | |||
} | |||
finally | |||
{ | |||
@@ -255,7 +259,7 @@ namespace MQTTnet.Core.Client | |||
{ | |||
try | |||
{ | |||
MqttNetTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); | |||
_trace.Information(nameof(MqttClient), "Received <<< {0}", packet); | |||
if (packet is MqttPingReqPacket) | |||
{ | |||
@@ -285,7 +289,7 @@ namespace MQTTnet.Core.Client | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); | |||
_trace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet."); | |||
} | |||
} | |||
@@ -298,7 +302,7 @@ namespace MQTTnet.Core.Client | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); | |||
_trace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message."); | |||
} | |||
} | |||
@@ -362,7 +366,7 @@ namespace MQTTnet.Core.Client | |||
private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) | |||
{ | |||
MqttNetTrace.Information(nameof(MqttClient), "Start sending keep alive packets."); | |||
_trace.Information(nameof(MqttClient), "Start sending keep alive packets."); | |||
try | |||
{ | |||
@@ -387,23 +391,23 @@ namespace MQTTnet.Core.Client | |||
return; | |||
} | |||
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); | |||
_trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets."); | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); | |||
_trace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets."); | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); | |||
_trace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); | |||
} | |||
} | |||
private async Task ReceivePackets(CancellationToken cancellationToken) | |||
{ | |||
MqttNetTrace.Information(nameof(MqttClient), "Start receiving packets."); | |||
_trace.Information(nameof(MqttClient), "Start receiving packets."); | |||
try | |||
{ | |||
@@ -430,17 +434,17 @@ namespace MQTTnet.Core.Client | |||
return; | |||
} | |||
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); | |||
_trace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); | |||
_trace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); | |||
await DisconnectInternalAsync().ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
MqttNetTrace.Information(nameof(MqttClient), "Stopped receiving packets."); | |||
_trace.Information(nameof(MqttClient), "Stopped receiving packets."); | |||
} | |||
} | |||
@@ -12,6 +12,12 @@ namespace MQTTnet.Core.Client | |||
{ | |||
private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>(); | |||
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>(); | |||
private readonly MqttNetTrace _trace; | |||
public MqttPacketDispatcher(MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
} | |||
public async Task<MqttBasePacket> WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout) | |||
{ | |||
@@ -24,7 +30,7 @@ namespace MQTTnet.Core.Client | |||
} | |||
catch (MqttCommunicationTimedOutException) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet of type '{0}'.", responseType.Name); | |||
_trace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet of type '{0}'.", responseType.Name); | |||
throw; | |||
} | |||
finally | |||
@@ -0,0 +1,9 @@ | |||
namespace MQTTnet.Core.Diagnostics | |||
{ | |||
public interface IMqttNetTraceHandler | |||
{ | |||
bool IsEnabled { get; } | |||
void HandleTraceMessage(MqttNetTraceMessage traceMessage); | |||
} | |||
} |
@@ -2,48 +2,62 @@ | |||
namespace MQTTnet.Core.Diagnostics | |||
{ | |||
public static class MqttNetTrace | |||
public sealed class MqttNetTrace : IMqttNetTraceHandler | |||
{ | |||
private readonly IMqttNetTraceHandler _traceHandler; | |||
public MqttNetTrace(IMqttNetTraceHandler traceHandler = null) | |||
{ | |||
_traceHandler = traceHandler ?? this; | |||
} | |||
public static event EventHandler<MqttNetTraceMessagePublishedEventArgs> TraceMessagePublished; | |||
public static void Verbose(string source, string message, params object[] parameters) | |||
public bool IsEnabled => TraceMessagePublished != null; | |||
public void Verbose(string source, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Verbose, null, message, parameters); | |||
} | |||
public static void Information(string source, string message, params object[] parameters) | |||
public void Information(string source, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Information, null, message, parameters); | |||
} | |||
public static void Warning(string source, string message, params object[] parameters) | |||
public void Warning(string source, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Warning, null, message, parameters); | |||
} | |||
public static void Warning(string source, Exception exception, string message, params object[] parameters) | |||
public void Warning(string source, Exception exception, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Warning, exception, message, parameters); | |||
} | |||
public static void Error(string source, string message, params object[] parameters) | |||
public void Error(string source, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Error, null, message, parameters); | |||
} | |||
public static void Error(string source, Exception exception, string message, params object[] parameters) | |||
public void Error(string source, Exception exception, string message, params object[] parameters) | |||
{ | |||
Publish(source, MqttNetTraceLevel.Error, exception, message, parameters); | |||
} | |||
private static void Publish(string source, MqttNetTraceLevel traceLevel, Exception exception, string message, params object[] parameters) | |||
public void HandleTraceMessage(MqttNetTraceMessage mqttNetTraceMessage) | |||
{ | |||
TraceMessagePublished?.Invoke(this, new MqttNetTraceMessagePublishedEventArgs(mqttNetTraceMessage)); | |||
} | |||
private void Publish(string source, MqttNetTraceLevel traceLevel, Exception exception, string message, params object[] parameters) | |||
{ | |||
var handler = TraceMessagePublished; | |||
if (handler == null) | |||
if (!_traceHandler.IsEnabled) | |||
{ | |||
return; | |||
} | |||
var now = DateTime.Now; | |||
if (parameters?.Length > 0) | |||
{ | |||
try | |||
@@ -57,7 +71,7 @@ namespace MQTTnet.Core.Diagnostics | |||
} | |||
} | |||
handler.Invoke(null, new MqttNetTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); | |||
_traceHandler.HandleTraceMessage(new MqttNetTraceMessage(now, Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); | |||
} | |||
} | |||
} |
@@ -0,0 +1,29 @@ | |||
using System; | |||
namespace MQTTnet.Core.Diagnostics | |||
{ | |||
public sealed class MqttNetTraceMessage | |||
{ | |||
public MqttNetTraceMessage(DateTime timestamp, int threadId, string source, MqttNetTraceLevel level, string message, Exception exception) | |||
{ | |||
Timestamp = timestamp; | |||
ThreadId = threadId; | |||
Source = source; | |||
Level = level; | |||
Message = message; | |||
Exception = exception; | |||
} | |||
public DateTime Timestamp { get; } | |||
public int ThreadId { get; } | |||
public string Source { get; } | |||
public MqttNetTraceLevel Level { get; } | |||
public string Message { get; } | |||
public Exception Exception { get; } | |||
} | |||
} |
@@ -4,23 +4,11 @@ namespace MQTTnet.Core.Diagnostics | |||
{ | |||
public sealed class MqttNetTraceMessagePublishedEventArgs : EventArgs | |||
{ | |||
public MqttNetTraceMessagePublishedEventArgs(int threadId, string source, MqttNetTraceLevel level, string message, Exception exception) | |||
public MqttNetTraceMessagePublishedEventArgs(MqttNetTraceMessage traceMessage) | |||
{ | |||
ThreadId = threadId; | |||
Source = source; | |||
Level = level; | |||
Message = message; | |||
Exception = exception; | |||
TraceMessage = traceMessage ?? throw new ArgumentNullException(nameof(traceMessage)); | |||
} | |||
public int ThreadId { get; } | |||
public string Source { get; } | |||
public MqttNetTraceLevel Level { get; } | |||
public string Message { get; } | |||
public Exception Exception { get; } | |||
public MqttNetTraceMessage TraceMessage { get; } | |||
} | |||
} |
@@ -4,6 +4,7 @@ using System.Linq; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Exceptions; | |||
using MQTTnet.Core.Packets; | |||
@@ -14,36 +15,23 @@ namespace MQTTnet.Core.ManagedClient | |||
private readonly List<MqttApplicationMessage> _messageQueue = new List<MqttApplicationMessage>(); | |||
private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false); | |||
private readonly MqttClient _mqttClient; | |||
private readonly MqttNetTrace _trace; | |||
private IManagedMqttClientOptions _options; | |||
public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory) | |||
public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) | |||
{ | |||
if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory)); | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_mqttClient = new MqttClient(communicationChannelFactory); | |||
_mqttClient = new MqttClient(communicationChannelFactory, _trace); | |||
_mqttClient.Connected += OnConnected; | |||
_mqttClient.Disconnected += OnDisconnected; | |||
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; | |||
} | |||
private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) | |||
{ | |||
ApplicationMessageReceived?.Invoke(this, e); | |||
} | |||
private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) | |||
{ | |||
//Disconnected?.Invoke(this, e); | |||
} | |||
private void OnConnected(object sender, EventArgs e) | |||
{ | |||
Connected?.Invoke(this, e); | |||
} | |||
public event EventHandler Connected; | |||
public event EventHandler Disconnected; | |||
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | |||
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | |||
public bool IsConnected => _mqttClient.IsConnected; | |||
@@ -51,7 +39,10 @@ namespace MQTTnet.Core.ManagedClient | |||
public void Start(IManagedMqttClientOptions options) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options)); | |||
} | |||
public void Stop() | |||
@@ -90,6 +81,8 @@ namespace MQTTnet.Core.ManagedClient | |||
public void Enqueue(IEnumerable<MqttApplicationMessage> applicationMessages) | |||
{ | |||
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); | |||
ThrowIfNotConnected(); | |||
_messageQueue.AddRange(applicationMessages); | |||
@@ -152,5 +145,20 @@ namespace MQTTnet.Core.ManagedClient | |||
// MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); | |||
//} | |||
} | |||
private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) | |||
{ | |||
ApplicationMessageReceived?.Invoke(this, eventArgs); | |||
} | |||
private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) | |||
{ | |||
Disconnected?.Invoke(this, eventArgs); | |||
} | |||
private void OnConnected(object sender, EventArgs eventArgs) | |||
{ | |||
Connected?.Invoke(this, eventArgs); | |||
} | |||
} | |||
} |
@@ -1,82 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Core.Diagnostics; | |||
namespace MQTTnet.Core.ManagedClient | |||
{ | |||
public class ManagedMqttClientMessagesManager | |||
{ | |||
private readonly IList<MqttApplicationMessage> _persistedMessages = new List<MqttApplicationMessage>(); | |||
private readonly ManagedMqttClientOptions _options; | |||
public ManagedMqttClientMessagesManager(ManagedMqttClientOptions options) | |||
{ | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
public async Task LoadMessagesAsync() | |||
{ | |||
try | |||
{ | |||
var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync(); | |||
lock (_persistedMessages) | |||
{ | |||
_persistedMessages.Clear(); | |||
foreach (var persistentMessage in persistentMessages) | |||
{ | |||
_persistedMessages.Add(persistentMessage); | |||
} | |||
} | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while loading persistent messages."); | |||
} | |||
} | |||
public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage) | |||
{ | |||
if (applicationMessage != null) | |||
{ | |||
lock (_persistedMessages) | |||
{ | |||
_persistedMessages.Add(applicationMessage); | |||
} | |||
} | |||
try | |||
{ | |||
if (_options.Storage != null) | |||
{ | |||
await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages); | |||
} | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while saving persistent messages."); | |||
} | |||
} | |||
public List<MqttApplicationMessage> GetMessages() | |||
{ | |||
var persistedMessages = new List<MqttApplicationMessage>(); | |||
lock (_persistedMessages) | |||
{ | |||
foreach (var persistedMessage in _persistedMessages) | |||
{ | |||
persistedMessages.Add(persistedMessage); | |||
} | |||
} | |||
return persistedMessages; | |||
} | |||
public async Task Remove(MqttApplicationMessage message) | |||
{ | |||
lock (_persistedMessages) | |||
_persistedMessages.Remove(message); | |||
await SaveMessageAsync(null); | |||
} | |||
} | |||
} |
@@ -0,0 +1,108 @@ | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Linq; | |||
using System.Text; | |||
using MQTTnet.Core.Exceptions; | |||
using MQTTnet.Core.Protocol; | |||
namespace MQTTnet.Core | |||
{ | |||
public class MqttApplicationMessageBuilder | |||
{ | |||
private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; | |||
private string _topic; | |||
private byte[] _payload; | |||
private bool _retain; | |||
public MqttApplicationMessageBuilder WithTopic(string topic) | |||
{ | |||
_topic = topic; | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithPayload(IEnumerable<byte> payload) | |||
{ | |||
if (payload == null) | |||
{ | |||
_payload = null; | |||
return this; | |||
} | |||
_payload = payload.ToArray(); | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithPayload(MemoryStream payload) | |||
{ | |||
if (payload == null) | |||
{ | |||
_payload = null; | |||
return this; | |||
} | |||
if (payload.Length == 0) | |||
{ | |||
_payload = new byte[0]; | |||
} | |||
else | |||
{ | |||
_payload = new byte[payload.Length - payload.Position]; | |||
payload.Read(_payload, 0, _payload.Length); | |||
} | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithPayload(string payload) | |||
{ | |||
if (payload == null) | |||
{ | |||
_payload = null; | |||
return this; | |||
} | |||
_payload = string.IsNullOrEmpty(payload) ? new byte[0] : Encoding.UTF8.GetBytes(payload); | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) | |||
{ | |||
_qualityOfServiceLevel = qualityOfServiceLevel; | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithRetainFlag(bool value = true) | |||
{ | |||
_retain = value; | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithAtLeastOnceQoS() | |||
{ | |||
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithAtMostOnceQoS() | |||
{ | |||
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; | |||
return this; | |||
} | |||
public MqttApplicationMessageBuilder WithExactlyOnceQoS() | |||
{ | |||
_qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; | |||
return this; | |||
} | |||
public MqttApplicationMessage Build() | |||
{ | |||
if (string.IsNullOrEmpty(_topic)) | |||
{ | |||
throw new MqttProtocolViolationException("Topic is not set."); | |||
} | |||
return new MqttApplicationMessage(_topic, _payload ?? new byte[0], _qualityOfServiceLevel, _retain); | |||
} | |||
} | |||
} |
@@ -1,51 +0,0 @@ | |||
using System; | |||
using System.Globalization; | |||
using System.IO; | |||
using System.Text; | |||
using MQTTnet.Core.Protocol; | |||
namespace MQTTnet.Core | |||
{ | |||
public class MqttApplicationMessageFactory | |||
{ | |||
public MqttApplicationMessage CreateApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) | |||
{ | |||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | |||
if (payload == null) | |||
{ | |||
payload = new byte[0]; | |||
} | |||
return new MqttApplicationMessage(topic, payload, qualityOfServiceLevel, retain); | |||
} | |||
public MqttApplicationMessage CreateApplicationMessage<TPayload>(string topic, TPayload payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) | |||
{ | |||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | |||
var payloadString = Convert.ToString(payload, CultureInfo.InvariantCulture); | |||
var payloadBuffer = string.IsNullOrEmpty(payloadString) ? new byte[0] : Encoding.UTF8.GetBytes(payloadString); | |||
return CreateApplicationMessage(topic, payloadBuffer, qualityOfServiceLevel, retain); | |||
} | |||
public MqttApplicationMessage CreateApplicationMessage(string topic, Stream payload, MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false) | |||
{ | |||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | |||
byte[] payloadBuffer; | |||
if (payload == null || payload.Length == 0) | |||
{ | |||
payloadBuffer = new byte[0]; | |||
} | |||
else | |||
{ | |||
payloadBuffer = new byte[payload.Length - payload.Position]; | |||
payload.Read(payloadBuffer, 0, payloadBuffer.Length); | |||
} | |||
return CreateApplicationMessage(topic, payloadBuffer, qualityOfServiceLevel, retain); | |||
} | |||
} | |||
} |
@@ -1,7 +1,9 @@ | |||
namespace MQTTnet.Core.Server | |||
using MQTTnet.Core.Diagnostics; | |||
namespace MQTTnet.Core.Server | |||
{ | |||
public interface IMqttServerFactory | |||
{ | |||
IMqttServer CreateMqttServer(MqttServerOptions options); | |||
IMqttServer CreateMqttServer(MqttServerOptions options, IMqttNetTraceHandler traceHandler = null); | |||
} | |||
} |
@@ -15,9 +15,11 @@ namespace MQTTnet.Core.Server | |||
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>(); | |||
private readonly MqttClientSession _session; | |||
private readonly MqttServerOptions _options; | |||
private readonly MqttNetTrace _trace; | |||
public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session) | |||
public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_session = session ?? throw new ArgumentNullException(nameof(session)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
@@ -50,7 +52,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets."); | |||
_trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets."); | |||
} | |||
} | |||
@@ -66,18 +68,18 @@ namespace MQTTnet.Core.Server | |||
{ | |||
if (exception is MqttCommunicationTimedOutException) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout."); | |||
_trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout."); | |||
} | |||
else if (exception is MqttCommunicationException) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); | |||
_trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); | |||
} | |||
if (exception is OperationCanceledException) | |||
{ | |||
} | |||
else | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); | |||
_trace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed."); | |||
} | |||
if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | |||
@@ -10,11 +10,13 @@ namespace MQTTnet.Core.Server | |||
{ | |||
public sealed class MqttClientRetainedMessagesManager | |||
{ | |||
private readonly MqttNetTrace _trace; | |||
private readonly Dictionary<string, MqttPublishPacket> _retainedMessages = new Dictionary<string, MqttPublishPacket>(); | |||
private readonly MqttServerOptions _options; | |||
public MqttClientRetainedMessagesManager(MqttServerOptions options) | |||
public MqttClientRetainedMessagesManager(MqttServerOptions options, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
} | |||
@@ -39,7 +41,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while loading retained messages."); | |||
_trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while loading retained messages."); | |||
} | |||
} | |||
@@ -53,12 +55,12 @@ namespace MQTTnet.Core.Server | |||
if (publishPacket.Payload?.Any() == false) | |||
{ | |||
_retainedMessages.Remove(publishPacket.Topic); | |||
MqttNetTrace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, publishPacket.Topic); | |||
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' cleared retained message for topic '{1}'.", clientId, publishPacket.Topic); | |||
} | |||
else | |||
{ | |||
_retainedMessages[publishPacket.Topic] = publishPacket; | |||
MqttNetTrace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, publishPacket.Topic); | |||
_trace.Information(nameof(MqttClientRetainedMessagesManager), "Client '{0}' updated retained message for topic '{1}'.", clientId, publishPacket.Topic); | |||
} | |||
allRetainedMessages = new List<MqttPublishPacket>(_retainedMessages.Values); | |||
@@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while saving retained messages."); | |||
_trace.Error(nameof(MqttClientRetainedMessagesManager), exception, "Unhandled exception while saving retained messages."); | |||
} | |||
} | |||
@@ -20,17 +20,19 @@ namespace MQTTnet.Core.Server | |||
private readonly MqttClientSessionsManager _mqttClientSessionsManager; | |||
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; | |||
private readonly MqttServerOptions _options; | |||
private readonly MqttNetTrace _trace; | |||
private IMqttCommunicationAdapter _adapter; | |||
private CancellationTokenSource _cancellationTokenSource; | |||
private MqttApplicationMessage _willMessage; | |||
public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager) | |||
public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
ClientId = clientId; | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager)); | |||
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this); | |||
_pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this, trace); | |||
} | |||
public string ClientId { get; } | |||
@@ -58,11 +60,11 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (MqttCommunicationException exception) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); | |||
_trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); | |||
_trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); | |||
} | |||
} | |||
@@ -79,7 +81,7 @@ namespace MQTTnet.Core.Server | |||
_adapter = null; | |||
MqttNetTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); | |||
_trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId); | |||
} | |||
public void EnqueuePublishPacket(MqttPublishPacket publishPacket) | |||
@@ -92,7 +94,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
_pendingMessagesQueue.Enqueue(publishPacket); | |||
MqttNetTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); | |||
_trace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId); | |||
} | |||
public void Dispose() | |||
@@ -116,12 +118,12 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (MqttCommunicationException exception) | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); | |||
_trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId); | |||
Stop(); | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); | |||
_trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); | |||
Stop(); | |||
} | |||
} | |||
@@ -163,7 +165,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
else | |||
{ | |||
MqttNetTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); | |||
_trace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet); | |||
Stop(); | |||
} | |||
} | |||
@@ -17,11 +17,13 @@ namespace MQTTnet.Core.Server | |||
{ | |||
private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>(); | |||
private readonly MqttServerOptions _options; | |||
private readonly MqttNetTrace _trace; | |||
public MqttClientSessionsManager(MqttServerOptions options) | |||
public MqttClientSessionsManager(MqttServerOptions options, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
RetainedMessagesManager = new MqttClientRetainedMessagesManager(options); | |||
RetainedMessagesManager = new MqttClientRetainedMessagesManager(options, trace); | |||
} | |||
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | |||
@@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttServer), exception, exception.Message); | |||
_trace.Error(nameof(MqttServer), exception, exception.Message); | |||
} | |||
finally | |||
{ | |||
@@ -124,7 +126,7 @@ namespace MQTTnet.Core.Server | |||
} | |||
catch (Exception exception) | |||
{ | |||
MqttNetTrace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); | |||
_trace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message"); | |||
} | |||
lock (_clientSessions) | |||
@@ -158,11 +160,11 @@ namespace MQTTnet.Core.Server | |||
_clientSessions.Remove(connectPacket.ClientId); | |||
clientSession.Dispose(); | |||
clientSession = null; | |||
MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); | |||
_trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); | |||
} | |||
else | |||
{ | |||
MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); | |||
_trace.Verbose(nameof(MqttClientSessionsManager), "Reusing existing session of client '{0}'.", connectPacket.ClientId); | |||
} | |||
} | |||
@@ -171,10 +173,10 @@ namespace MQTTnet.Core.Server | |||
{ | |||
isExistingSession = false; | |||
clientSession = new MqttClientSession(connectPacket.ClientId, _options, this); | |||
clientSession = new MqttClientSession(connectPacket.ClientId, _options, this, _trace); | |||
_clientSessions[connectPacket.ClientId] = clientSession; | |||
MqttNetTrace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); | |||
_trace.Verbose(nameof(MqttClientSessionsManager), "Created a new session for client '{0}'.", connectPacket.ClientId); | |||
} | |||
return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; | |||
@@ -10,18 +10,20 @@ namespace MQTTnet.Core.Server | |||
{ | |||
public sealed class MqttServer : IMqttServer | |||
{ | |||
private readonly MqttNetTrace _trace; | |||
private readonly MqttClientSessionsManager _clientSessionsManager; | |||
private readonly ICollection<IMqttServerAdapter> _adapters; | |||
private readonly MqttServerOptions _options; | |||
private CancellationTokenSource _cancellationTokenSource; | |||
public MqttServer(MqttServerOptions options, ICollection<IMqttServerAdapter> adapters) | |||
public MqttServer(MqttServerOptions options, ICollection<IMqttServerAdapter> adapters, MqttNetTrace trace) | |||
{ | |||
_trace = trace ?? throw new ArgumentNullException(nameof(trace)); | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_adapters = adapters ?? throw new ArgumentNullException(nameof(adapters)); | |||
_clientSessionsManager = new MqttClientSessionsManager(options); | |||
_clientSessionsManager = new MqttClientSessionsManager(options, trace); | |||
_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); | |||
_clientSessionsManager.ClientConnected += OnClientConnected; | |||
_clientSessionsManager.ClientDisconnected += OnClientDisconnected; | |||
@@ -57,7 +59,7 @@ namespace MQTTnet.Core.Server | |||
await adapter.StartAsync(_options); | |||
} | |||
MqttNetTrace.Information(nameof(MqttServer), "Started."); | |||
_trace.Information(nameof(MqttServer), "Started."); | |||
} | |||
public async Task StopAsync() | |||
@@ -74,7 +76,7 @@ namespace MQTTnet.Core.Server | |||
_clientSessionsManager.Clear(); | |||
MqttNetTrace.Information(nameof(MqttServer), "Stopped."); | |||
_trace.Information(nameof(MqttServer), "Stopped."); | |||
} | |||
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) | |||
@@ -84,13 +86,13 @@ namespace MQTTnet.Core.Server | |||
private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) | |||
{ | |||
MqttNetTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); | |||
_trace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId); | |||
ClientConnected?.Invoke(this, eventArgs); | |||
} | |||
private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) | |||
{ | |||
MqttNetTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); | |||
_trace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId); | |||
ClientDisconnected?.Invoke(this, eventArgs); | |||
} | |||
} | |||
@@ -25,6 +25,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 | |||
EndProject | |||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" | |||
ProjectSection(SolutionItems) = preProject | |||
LICENSE = LICENSE | |||
README.md = README.md | |||
EndProjectSection | |||
EndProject | |||
@@ -6,7 +6,7 @@ | |||
[![BCH compliance](https://bettercodehub.com/edge/badge/chkr1011/MQTTnet?branch=master)](https://bettercodehub.com/) | |||
# MQTTnet | |||
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. | |||
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/. | |||
# Features | |||
@@ -18,7 +18,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien | |||
* Performance optimized (processing ~27.000 messages / second)* | |||
* Interfaces included for mocking and testing | |||
* Access to internal trace messages | |||
* Unit tested (62+ tests) | |||
* Unit tested (67+ tests) | |||
\* Tested on local machine with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetFramework_. | |||
@@ -61,4 +61,26 @@ This library is used in the following projects: | |||
* MQTT Client Rx (Wrapper for Reactive Extensions, https://github.com/1iveowl/MQTTClient.rx) | |||
* Wirehome (Open Source Home Automation system for .NET, https://github.com/chkr1011/Wirehome) | |||
If you use this library and want to see your project here please let me know. | |||
If you use this library and want to see your project here please let me know. | |||
# MIT License | |||
Copyright (c) 2017 Christian Kratky | |||
Permission is hereby granted, free of charge, to any person obtaining a copy | |||
of this software and associated documentation files (the "Software"), to deal | |||
in the Software without restriction, including without limitation the rights | |||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |||
copies of the Software, and to permit persons to whom the Software is | |||
furnished to do so, subject to the following conditions: | |||
The above copyright notice and this permission notice shall be included in all | |||
copies or substantial portions of the Software. | |||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |||
SOFTWARE. |
@@ -0,0 +1,61 @@ | |||
using System; | |||
using System.IO; | |||
using System.Text; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Core.Protocol; | |||
namespace MQTTnet.Core.Tests | |||
{ | |||
[TestClass] | |||
public class MqttApplicationMessageBuilderTests | |||
{ | |||
[TestMethod] | |||
public void CreateApplicationMessage_TopicOnly() | |||
{ | |||
var message = new MqttApplicationMessageBuilder().WithTopic("Abc").Build(); | |||
Assert.AreEqual("Abc", message.Topic); | |||
Assert.IsFalse(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); | |||
} | |||
[TestMethod] | |||
public void CreateApplicationMessage_TimeStampPayload() | |||
{ | |||
var message = new MqttApplicationMessageBuilder().WithTopic("xyz").WithPayload(TimeSpan.FromSeconds(360).ToString()).Build(); | |||
Assert.AreEqual("xyz", message.Topic); | |||
Assert.IsFalse(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); | |||
Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "00:06:00"); | |||
} | |||
[TestMethod] | |||
public void CreateApplicationMessage_StreamPayload() | |||
{ | |||
var stream = new MemoryStream(Encoding.UTF8.GetBytes("xHello")) { Position = 1 }; | |||
var message = new MqttApplicationMessageBuilder().WithTopic("123").WithPayload(stream).Build(); | |||
Assert.AreEqual("123", message.Topic); | |||
Assert.IsFalse(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); | |||
Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "Hello"); | |||
} | |||
[TestMethod] | |||
public void CreateApplicationMessage_Retained() | |||
{ | |||
var message = new MqttApplicationMessageBuilder().WithTopic("lol").WithRetainFlag().Build(); | |||
Assert.AreEqual("lol", message.Topic); | |||
Assert.IsTrue(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); | |||
} | |||
[TestMethod] | |||
public void CreateApplicationMessage_QosLevel2() | |||
{ | |||
var message = new MqttApplicationMessageBuilder().WithTopic("rofl").WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build(); | |||
Assert.AreEqual("rofl", message.Topic); | |||
Assert.IsTrue(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.ExactlyOnce, message.QualityOfServiceLevel); | |||
} | |||
} | |||
} |
@@ -1,30 +0,0 @@ | |||
using System; | |||
using System.Text; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Core.Protocol; | |||
namespace MQTTnet.Core.Tests | |||
{ | |||
[TestClass] | |||
public class MqttApplicationMessageFactoryTests | |||
{ | |||
[TestMethod] | |||
public void CreateApplicationMessage_TopicOnly() | |||
{ | |||
var message = new MqttApplicationMessageFactory().CreateApplicationMessage("Abc", MqttQualityOfServiceLevel.AtLeastOnce); | |||
Assert.AreEqual("Abc", message.Topic); | |||
Assert.IsFalse(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtLeastOnce, message.QualityOfServiceLevel); | |||
} | |||
[TestMethod] | |||
public void CreateApplicationMessage_TimeStampPayload() | |||
{ | |||
var message = new MqttApplicationMessageFactory().CreateApplicationMessage("xyz", TimeSpan.FromSeconds(360)); | |||
Assert.AreEqual("xyz", message.Topic); | |||
Assert.IsFalse(message.Retain); | |||
Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); | |||
Assert.AreEqual(Encoding.UTF8.GetString(message.Payload), "00:06:00"); | |||
} | |||
} | |||
} |
@@ -3,6 +3,7 @@ using System.Threading.Tasks; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
using MQTTnet.Core.Packets; | |||
using MQTTnet.Core.Protocol; | |||
using MQTTnet.Core.Server; | |||
@@ -49,7 +50,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_WillMessage() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); | |||
@@ -73,7 +74,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_Unsubscribe() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -109,7 +110,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_Publish() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -132,7 +133,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_NoRetainedMessage() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -155,7 +156,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_RetainedMessage() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -178,7 +179,7 @@ namespace MQTTnet.Core.Tests | |||
public async Task MqttServer_ClearRetainedMessage() | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -204,7 +205,7 @@ namespace MQTTnet.Core.Tests | |||
var storage = new TestStorage(); | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions { Storage = storage }, new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions { Storage = storage }, new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -213,7 +214,7 @@ namespace MQTTnet.Core.Tests | |||
await s.StopAsync(); | |||
s = new MqttServer(new MqttServerOptions { Storage = storage }, new List<IMqttServerAdapter> { serverAdapter }); | |||
s = new MqttServer(new MqttServerOptions { Storage = storage }, new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); | |||
@@ -235,7 +236,7 @@ namespace MQTTnet.Core.Tests | |||
public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages) | |||
{ | |||
_messages = messages; | |||
return Task.FromResult(0); | |||
return Task.CompletedTask; | |||
} | |||
public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() | |||
@@ -252,7 +253,7 @@ namespace MQTTnet.Core.Tests | |||
int expectedReceivedMessagesCount) | |||
{ | |||
var serverAdapter = new TestMqttServerAdapter(); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }); | |||
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter }, new MqttNetTrace()); | |||
await s.StartAsync(); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
@@ -3,6 +3,7 @@ using System.Threading.Tasks; | |||
using MQTTnet.Core.Adapter; | |||
using MQTTnet.Core.Server; | |||
using MQTTnet.Core.Client; | |||
using MQTTnet.Core.Diagnostics; | |||
namespace MQTTnet.Core.Tests | |||
{ | |||
@@ -17,7 +18,7 @@ namespace MQTTnet.Core.Tests | |||
adapterA.Partner = adapterB; | |||
adapterB.Partner = adapterA; | |||
var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA)); | |||
var client = new MqttClient(new MqttCommunicationAdapterFactory(adapterA), new MqttNetTrace()); | |||
var connected = WaitForClientToConnect(server, clientId); | |||
FireClientAcceptedEvent(adapterB); | |||
@@ -45,10 +45,10 @@ namespace MQTTnet.TestApp.NetCore | |||
{ | |||
MqttNetTrace.TraceMessagePublished += (s, e) => | |||
{ | |||
Console.WriteLine($">> [{DateTime.Now:O}] [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||
if (e.Exception != null) | |||
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
Console.WriteLine(e.Exception); | |||
Console.WriteLine(e.TraceMessage.Exception); | |||
} | |||
}; | |||
@@ -134,10 +134,10 @@ namespace MQTTnet.TestApp.NetCore | |||
{ | |||
MqttNetTrace.TraceMessagePublished += (s, e) => | |||
{ | |||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||
if (e.Exception != null) | |||
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
Console.WriteLine(e.Exception); | |||
Console.WriteLine(e.TraceMessage.Exception); | |||
} | |||
}; | |||
@@ -162,11 +162,11 @@ namespace MQTTnet.TestApp.NetCore | |||
options.Storage = new RetainedMessageHandler(); | |||
var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); | |||
options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); | |||
options.ConnectionBacklog = 5; | |||
options.DefaultEndpointOptions.IsEnabled = true; | |||
options.TlsEndpointOptions.IsEnabled = false; | |||
//var certificate = new X509Certificate(@"C:\certs\test\test.cer", ""); | |||
//options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert); | |||
//options.ConnectionBacklog = 5; | |||
//options.DefaultEndpointOptions.IsEnabled = true; | |||
//options.TlsEndpointOptions.IsEnabled = false; | |||
var mqttServer = new MqttServerFactory().CreateMqttServer(options); | |||
mqttServer.ClientDisconnected += (s, e) => | |||
@@ -189,6 +189,51 @@ namespace MQTTnet.TestApp.NetCore | |||
Console.ReadLine(); | |||
return Task.FromResult(0); | |||
} | |||
// ReSharper disable once UnusedMember.Local | |||
private static async void WikiCode() | |||
{ | |||
{ | |||
var client = new MqttClientFactory().CreateMqttClient(new CustomTraceHandler("Client 1")); | |||
var message = new MqttApplicationMessageBuilder() | |||
.WithTopic("MyTopic") | |||
.WithPayload("Hello World") | |||
.WithExactlyOnceQoS() | |||
.WithRetainFlag() | |||
.Build(); | |||
await client.PublishAsync(message); | |||
} | |||
{ | |||
var message = new MqttApplicationMessageBuilder() | |||
.WithTopic("/MQTTnet/is/awesome") | |||
.Build(); | |||
} | |||
} | |||
} | |||
public class CustomTraceHandler : IMqttNetTraceHandler | |||
{ | |||
private readonly string _clientId; | |||
public CustomTraceHandler(string clientId) | |||
{ | |||
_clientId = clientId; | |||
} | |||
public bool IsEnabled { get; } = true; | |||
public void HandleTraceMessage(MqttNetTraceMessage traceMessage) | |||
{ | |||
// Client ID is added to the trace message. | |||
Console.WriteLine($">> [{_clientId}] [{traceMessage.Timestamp:O}] [{traceMessage.ThreadId}] [{traceMessage.Source}] [{traceMessage.Level}]: {traceMessage.Message}"); | |||
if (traceMessage.Exception != null) | |||
{ | |||
Console.WriteLine(traceMessage.Exception); | |||
} | |||
} | |||
} | |||
public class RetainedMessageHandler : IMqttServerStorage | |||
@@ -213,7 +258,7 @@ namespace MQTTnet.TestApp.NetCore | |||
{ | |||
retainedMessages = new List<MqttApplicationMessage>(); | |||
} | |||
return Task.FromResult(retainedMessages); | |||
} | |||
} | |||
@@ -29,10 +29,10 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
{ | |||
await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.High, () => | |||
{ | |||
var text = $"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] [{e.Level}] [{e.Source}] [{e.ThreadId}] [{e.Message}]{Environment.NewLine}"; | |||
if (e.Exception != null) | |||
var text = $"[{e.TraceMessage.Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{e.TraceMessage.Level}] [{e.TraceMessage.Source}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Message}]{Environment.NewLine}"; | |||
if (e.TraceMessage.Exception != null) | |||
{ | |||
text += $"{e.Exception}{Environment.NewLine}"; | |||
text += $"{e.TraceMessage.Exception}{Environment.NewLine}"; | |||
} | |||
Trace.Text += text; | |||