diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 2be9d25..5f965a3 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -10,9 +10,11 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
- * [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads.
+ * [Core] Performance optimizations.
+* [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads.
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client.
+* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan)
Copyright Christian Kratky 2016-2018
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
index edf9b44..4f28a90 100644
--- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
+++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
@@ -57,14 +57,14 @@ namespace MQTTnet.Client
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);
- _logger.Verbose("Trying to connect with server.");
+ _logger.Verbose(this, "Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
- _logger.Verbose("Connection with server established.");
+ _logger.Verbose(this, "Connection with server established.");
StartReceivingPackets(_cancellationTokenSource.Token);
var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false);
- _logger.Verbose("MQTT connection with server established.");
+ _logger.Verbose(this, "MQTT connection with server established.");
_sendTracker.Restart();
@@ -76,12 +76,12 @@ namespace MQTTnet.Client
IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
- _logger.Info("Connected.");
+ _logger.Info(this, "Connected.");
return new MqttClientConnectResult(connectResponse.IsSessionPresent);
}
catch (Exception exception)
{
- _logger.Error(exception, "Error while connecting with server.");
+ _logger.Error(this, exception, "Error while connecting with server.");
await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
throw;
@@ -233,7 +233,7 @@ namespace MQTTnet.Client
private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
- await _disconnectLock.WaitAsync();
+ await _disconnectLock.WaitAsync().ConfigureAwait(false);
try
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
@@ -245,7 +245,7 @@ namespace MQTTnet.Client
}
catch (Exception adapterException)
{
- _logger.Warning(adapterException, "Error while disconnecting from adapter.");
+ _logger.Warning(this, adapterException, "Error while disconnecting from adapter.");
}
finally
{
@@ -260,21 +260,16 @@ namespace MQTTnet.Client
await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
- if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
- {
- await _keepAliveMessageSenderTask.ConfigureAwait(false);
- }
-
if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
- _logger.Verbose("Disconnected from adapter.");
+ _logger.Verbose(this, "Disconnected from adapter.");
}
catch (Exception adapterException)
{
- _logger.Warning(adapterException, "Error while disconnecting from adapter.");
+ _logger.Warning(this, adapterException, "Error while disconnecting from adapter.");
}
finally
{
@@ -283,7 +278,7 @@ namespace MQTTnet.Client
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
- _logger.Info("Disconnected.");
+ _logger.Info(this, "Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
}
@@ -329,7 +324,7 @@ namespace MQTTnet.Client
}
catch (MqttCommunicationTimedOutException)
{
- _logger.Warning($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'.");
+ _logger.Warning(this, null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace);
throw;
}
finally
@@ -340,7 +335,7 @@ namespace MQTTnet.Client
private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
- _logger.Verbose("Start sending keep alive packets.");
+ _logger.Verbose(this, "Start sending keep alive packets.");
try
{
@@ -367,24 +362,24 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets.");
+ _logger.Warning(this, exception, "MQTT communication exception while sending/receiving keep alive packets.");
}
else
{
- _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
+ _logger.Error(this, exception, "Unhandled exception while sending/receiving keep alive packets.");
}
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
finally
{
- _logger.Verbose("Stopped sending keep alive packets.");
+ _logger.Verbose(this, "Stopped sending keep alive packets.");
}
}
private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
- _logger.Verbose("Start receiving packets.");
+ _logger.Verbose(this, "Start receiving packets.");
try
{
@@ -419,11 +414,11 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "MQTT communication exception while receiving packets.");
+ _logger.Warning(this, exception, "MQTT communication exception while receiving packets.");
}
else
{
- _logger.Error(exception, "Unhandled exception while receiving packets.");
+ _logger.Error(this, exception, "Unhandled exception while receiving packets.");
}
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
@@ -431,7 +426,7 @@ namespace MQTTnet.Client
}
finally
{
- _logger.Verbose("Stopped receiving packets.");
+ _logger.Verbose(this, "Stopped receiving packets.");
}
}
@@ -467,7 +462,7 @@ namespace MQTTnet.Client
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while processing received packet.");
+ _logger.Error(this, exception, "Unhandled exception while processing received packet.");
}
}
@@ -508,7 +503,7 @@ namespace MQTTnet.Client
private void StartReceivingPackets(CancellationToken cancellationToken)
{
_packetReceiverTask = Task.Factory.StartNew(
- () => ReceivePacketsAsync(cancellationToken),
+ () => ReceivePacketsAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
@@ -518,7 +513,7 @@ namespace MQTTnet.Client
{
_keepAliveMessageSenderTask = Task.Factory.StartNew(
() => SendKeepAliveMessagesAsync(cancellationToken),
- cancellationToken,
+ cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}
@@ -539,7 +534,7 @@ namespace MQTTnet.Client
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while handling application message.");
+ _logger.Error(this, exception, "Unhandled exception while handling application message.");
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
index 6ed16de..b7d85fe 100644
--- a/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
+++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
@@ -8,14 +8,18 @@ namespace MQTTnet.Diagnostics
void Verbose(string message, params object[] parameters);
+ void Verbose(object source, string message, params object[] parameters);
+
void Info(string message, params object[] parameters);
+ void Info(object source, string message, params object[] parameters);
+
void Warning(Exception exception, string message, params object[] parameters);
- void Warning(string message, params object[] parameters);
+ void Warning(object source, Exception exception, string message, params object[] parameters);
void Error(Exception exception, string message, params object[] parameters);
- void Error(string message, params object[] parameters);
+ void Error(object source, Exception exception, string message, params object[] parameters);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
index 53598b2..9421921 100644
--- a/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
+++ b/Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
@@ -15,35 +15,45 @@ namespace MQTTnet.Diagnostics
public void Verbose(string message, params object[] parameters)
{
- Publish(MqttNetLogLevel.Verbose, null, message, parameters);
+ Publish(MqttNetLogLevel.Verbose, typeof(TSource), message, parameters, null);
+ }
+
+ public void Verbose(object source, string message, params object[] parameters)
+ {
+ Publish(MqttNetLogLevel.Verbose, source, message, parameters, null);
}
public void Info(string message, params object[] parameters)
{
- Publish(MqttNetLogLevel.Info, null, message, parameters);
+ Publish(MqttNetLogLevel.Info, typeof(TSource), message, parameters, null);
+ }
+
+ public void Info(object source, string message, params object[] parameters)
+ {
+ Publish(MqttNetLogLevel.Info, source, message, parameters, null);
}
public void Warning(Exception exception, string message, params object[] parameters)
{
- Publish(MqttNetLogLevel.Warning, exception, message, parameters);
+ Publish(MqttNetLogLevel.Warning, typeof(TSource), message, parameters, null);
}
- public void Warning(string message, params object[] parameters)
+ public void Warning(object source, Exception exception, string message, params object[] parameters)
{
- Warning(null, message, parameters);
+ Publish(MqttNetLogLevel.Warning, source, message, parameters, null);
}
public void Error(Exception exception, string message, params object[] parameters)
{
- Publish(MqttNetLogLevel.Error, exception, message, parameters);
+ Publish(MqttNetLogLevel.Error, typeof(TSource), message, parameters, null);
}
- public void Error(string message, params object[] parameters)
+ public void Error(object source, Exception exception, string message, params object[] parameters)
{
- Warning(null, message, parameters);
+ Publish(MqttNetLogLevel.Error, source, message, parameters, null);
}
- private void Publish(MqttNetLogLevel logLevel, Exception exception, string message, object[] parameters)
+ private void Publish(MqttNetLogLevel logLevel, object source, string message, object[] parameters, Exception exception)
{
var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLogger.HasListeners;
@@ -58,7 +68,13 @@ namespace MQTTnet.Diagnostics
message = string.Format(message, parameters);
}
- var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, typeof(TSource).Name, logLevel, message, exception);
+ string sourceName = null;
+ if (source != null)
+ {
+ sourceName = source.GetType().Name;
+ }
+
+ var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, sourceName, logLevel, message, exception);
if (hasGlobalListeners)
{
diff --git a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
index 9ae7715..3637319 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
@@ -6,6 +6,7 @@ namespace MQTTnet.Server
{
int ConnectionBacklog { get; }
int MaxPendingMessagesPerClient { get; }
+ MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }
TimeSpan DefaultCommunicationTimeout { get; }
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
index 8dc96e4..ac95807 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
@@ -59,7 +59,7 @@ namespace MQTTnet.Server
// Values described here: [MQTT-3.1.2-24].
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D)
{
- _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
+ _logger.Warning(this, null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
_timeoutCallback?.Invoke();
@@ -74,11 +74,11 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
- _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
+ _logger.Error(this, exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
}
finally
{
- _logger.Verbose("Client {0}: Stopped checking keep alive timeout.", _clientId);
+ _logger.Verbose(this, "Client {0}: Stopped checking keep alive timeout.", _clientId);
}
}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
index fd89691..a6e2204 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
@@ -50,25 +50,27 @@ namespace MQTTnet.Server
}
}
- public async Task DropPacket()
- {
- MqttBasePacket packet = null;
- await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false);
- if (!_queue.TryDequeue(out packet))
- {
- throw new InvalidOperationException(); // should not happen
- }
- _queueWaitSemaphore.Release();
- }
-
public void Enqueue(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
+ if (_queue.Count >= _options.MaxPendingMessagesPerClient)
+ {
+ if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
+ {
+ return;
+ }
+
+ if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
+ {
+ _queue.TryDequeue(out _);
+ }
+ }
+
_queue.Enqueue(packet);
_queueAutoResetEvent.Set();
- _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Verbose(this, "Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
}
private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
@@ -77,7 +79,7 @@ namespace MQTTnet.Server
{
while (!cancellationToken.IsCancellationRequested)
{
- await SendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
+ await TrySendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -85,19 +87,23 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Error(this, exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
}
}
- private async Task SendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
+ private async Task TrySendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
MqttBasePacket packet = null;
try
{
- await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false);
+ if (_queue.IsEmpty)
+ {
+ await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false);
+ }
+
if (!_queue.TryDequeue(out packet))
{
- throw new InvalidOperationException(); // should not happen
+ return;
}
if (cancellationToken.IsCancellationRequested)
@@ -107,24 +113,25 @@ namespace MQTTnet.Server
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false);
- _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Verbose("Enqueued packet sent (ClientId: {0}).",
+ _clientSession.ClientId);
}
catch (Exception exception)
{
if (exception is MqttCommunicationTimedOutException)
{
- _logger.Warning(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Warning(this, exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId);
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Warning(this, exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId);
}
else if (exception is OperationCanceledException)
{
}
else
{
- _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
+ _logger.Error(this, exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
}
if (packet is MqttPublishPacket publishPacket)
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
index 7d50bad..f4f8c99 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
@@ -147,10 +147,7 @@ namespace MQTTnet.Server
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}
- if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count)
- {
- await PendingMessagesQueue.DropPacket();
- }
+
PendingMessagesQueue.Enqueue(publishPacket);
}
@@ -276,7 +273,7 @@ namespace MQTTnet.Server
return Task.FromResult(0);
}
- _logger.Warning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
+ _logger.Warning(this, null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
Stop(MqttClientDisconnectType.NotClean);
return Task.FromResult(0);
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs
new file mode 100644
index 0000000..1601487
--- /dev/null
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs
@@ -0,0 +1,8 @@
+namespace MQTTnet.Server
+{
+ public enum MqttPendingMessagesOverflowStrategy
+ {
+ DropOldestQueuedMessage,
+ DropNewMessage
+ }
+}
diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs
index 4315fcf..e6117fd 100644
--- a/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs
+++ b/Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs
@@ -11,7 +11,9 @@ namespace MQTTnet.Server
public int ConnectionBacklog { get; set; } = 10;
public int MaxPendingMessagesPerClient { get; set; } = 250;
-
+
+ public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage;
+
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);
public Action ConnectionValidator { get; set; }