Преглед изворни кода

Add overflow strategy for pending messages queue.

release/3.x.x
Christian пре 6 година
родитељ
комит
e14ea9a240
10 измењених фајлова са 103 додато и 71 уклоњено
  1. +3
    -1
      Build/MQTTnet.nuspec
  2. +23
    -28
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  3. +6
    -2
      Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs
  4. +26
    -10
      Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs
  5. +1
    -0
      Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
  6. +3
    -3
      Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
  7. +28
    -21
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  8. +2
    -5
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  9. +8
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs
  10. +3
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs

+ 3
- 1
Build/MQTTnet.nuspec Прегледај датотеку

@@ -10,9 +10,11 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> * [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads.
<releaseNotes> * [Core] Performance optimizations.
* [Client] Received messages are now processed in the worker thread by default. Added a new setting for switching back to dedicated threads.
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client.
* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan)
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 23
- 28
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs Прегледај датотеку

@@ -57,14 +57,14 @@ namespace MQTTnet.Client

_adapter = _adapterFactory.CreateClientAdapter(options, _logger);

_logger.Verbose<MqttClient>("Trying to connect with server.");
_logger.Verbose(this, "Trying to connect with server.");
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose<MqttClient>("Connection with server established.");
_logger.Verbose(this, "Connection with server established.");

StartReceivingPackets(_cancellationTokenSource.Token);

var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose<MqttClient>("MQTT connection with server established.");
_logger.Verbose(this, "MQTT connection with server established.");

_sendTracker.Restart();

@@ -76,12 +76,12 @@ namespace MQTTnet.Client
IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));

_logger.Info<MqttClient>("Connected.");
_logger.Info(this, "Connected.");
return new MqttClientConnectResult(connectResponse.IsSessionPresent);
}
catch (Exception exception)
{
_logger.Error<MqttClient>(exception, "Error while connecting with server.");
_logger.Error(this, exception, "Error while connecting with server.");
await DisconnectInternalAsync(null, exception).ConfigureAwait(false);

throw;
@@ -233,7 +233,7 @@ namespace MQTTnet.Client

private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
await _disconnectLock.WaitAsync();
await _disconnectLock.WaitAsync().ConfigureAwait(false);
try
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
@@ -245,7 +245,7 @@ namespace MQTTnet.Client
}
catch (Exception adapterException)
{
_logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
_logger.Warning(this, adapterException, "Error while disconnecting from adapter.");
}
finally
{
@@ -260,21 +260,16 @@ namespace MQTTnet.Client
await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);

if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
{
await _keepAliveMessageSenderTask.ConfigureAwait(false);
}

if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}

_logger.Verbose<MqttClient>("Disconnected from adapter.");
_logger.Verbose(this, "Disconnected from adapter.");
}
catch (Exception adapterException)
{
_logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
_logger.Warning(this, adapterException, "Error while disconnecting from adapter.");
}
finally
{
@@ -283,7 +278,7 @@ namespace MQTTnet.Client
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_logger.Info<MqttClient>("Disconnected.");
_logger.Info(this, "Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
}
@@ -329,7 +324,7 @@ namespace MQTTnet.Client
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning<MqttPacketDispatcher>($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'.");
_logger.Warning(this, null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace);
throw;
}
finally
@@ -340,7 +335,7 @@ namespace MQTTnet.Client

private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
{
_logger.Verbose<MqttClient>("Start sending keep alive packets.");
_logger.Verbose(this, "Start sending keep alive packets.");

try
{
@@ -367,24 +362,24 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
_logger.Warning(this, exception, "MQTT communication exception while sending/receiving keep alive packets.");
}
else
{
_logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
_logger.Error(this, exception, "Unhandled exception while sending/receiving keep alive packets.");
}

await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
finally
{
_logger.Verbose<MqttClient>("Stopped sending keep alive packets.");
_logger.Verbose(this, "Stopped sending keep alive packets.");
}
}

private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
_logger.Verbose<MqttClient>("Start receiving packets.");
_logger.Verbose(this, "Start receiving packets.");

try
{
@@ -419,11 +414,11 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
_logger.Warning(this, exception, "MQTT communication exception while receiving packets.");
}
else
{
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");
_logger.Error(this, exception, "Unhandled exception while receiving packets.");
}

await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
@@ -431,7 +426,7 @@ namespace MQTTnet.Client
}
finally
{
_logger.Verbose<MqttClient>("Stopped receiving packets.");
_logger.Verbose(this, "Stopped receiving packets.");
}
}

@@ -467,7 +462,7 @@ namespace MQTTnet.Client
}
catch (Exception exception)
{
_logger.Error<MqttClient>(exception, "Unhandled exception while processing received packet.");
_logger.Error(this, exception, "Unhandled exception while processing received packet.");
}
}

@@ -508,7 +503,7 @@ namespace MQTTnet.Client
private void StartReceivingPackets(CancellationToken cancellationToken)
{
_packetReceiverTask = Task.Factory.StartNew(
() => ReceivePacketsAsync(cancellationToken),
() => ReceivePacketsAsync(cancellationToken),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
@@ -518,7 +513,7 @@ namespace MQTTnet.Client
{
_keepAliveMessageSenderTask = Task.Factory.StartNew(
() => SendKeepAliveMessagesAsync(cancellationToken),
cancellationToken,
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}
@@ -539,7 +534,7 @@ namespace MQTTnet.Client
}
catch (Exception exception)
{
_logger.Error<MqttClient>(exception, "Unhandled exception while handling application message.");
_logger.Error(this, exception, "Unhandled exception while handling application message.");
}
}



+ 6
- 2
Frameworks/MQTTnet.NetStandard/Diagnostics/IMqttNetLogger.cs Прегледај датотеку

@@ -8,14 +8,18 @@ namespace MQTTnet.Diagnostics

void Verbose<TSource>(string message, params object[] parameters);

void Verbose(object source, string message, params object[] parameters);

void Info<TSource>(string message, params object[] parameters);

void Info(object source, string message, params object[] parameters);

void Warning<TSource>(Exception exception, string message, params object[] parameters);

void Warning<TSource>(string message, params object[] parameters);
void Warning(object source, Exception exception, string message, params object[] parameters);

void Error<TSource>(Exception exception, string message, params object[] parameters);

void Error<TSource>(string message, params object[] parameters);
void Error(object source, Exception exception, string message, params object[] parameters);
}
}

+ 26
- 10
Frameworks/MQTTnet.NetStandard/Diagnostics/MqttNetLogger.cs Прегледај датотеку

@@ -15,35 +15,45 @@ namespace MQTTnet.Diagnostics

public void Verbose<TSource>(string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters);
Publish(MqttNetLogLevel.Verbose, typeof(TSource), message, parameters, null);
}

public void Verbose(object source, string message, params object[] parameters)
{
Publish(MqttNetLogLevel.Verbose, source, message, parameters, null);
}

public void Info<TSource>(string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Info, null, message, parameters);
Publish(MqttNetLogLevel.Info, typeof(TSource), message, parameters, null);
}

public void Info(object source, string message, params object[] parameters)
{
Publish(MqttNetLogLevel.Info, source, message, parameters, null);
}

public void Warning<TSource>(Exception exception, string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Warning, exception, message, parameters);
Publish(MqttNetLogLevel.Warning, typeof(TSource), message, parameters, null);
}

public void Warning<TSource>(string message, params object[] parameters)
public void Warning(object source, Exception exception, string message, params object[] parameters)
{
Warning<TSource>(null, message, parameters);
Publish(MqttNetLogLevel.Warning, source, message, parameters, null);
}

public void Error<TSource>(Exception exception, string message, params object[] parameters)
{
Publish<TSource>(MqttNetLogLevel.Error, exception, message, parameters);
Publish(MqttNetLogLevel.Error, typeof(TSource), message, parameters, null);
}

public void Error<TSource>(string message, params object[] parameters)
public void Error(object source, Exception exception, string message, params object[] parameters)
{
Warning<TSource>(null, message, parameters);
Publish(MqttNetLogLevel.Error, source, message, parameters, null);
}

private void Publish<TSource>(MqttNetLogLevel logLevel, Exception exception, string message, object[] parameters)
private void Publish(MqttNetLogLevel logLevel, object source, string message, object[] parameters, Exception exception)
{
var hasLocalListeners = LogMessagePublished != null;
var hasGlobalListeners = MqttNetGlobalLogger.HasListeners;
@@ -58,7 +68,13 @@ namespace MQTTnet.Diagnostics
message = string.Format(message, parameters);
}

var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, typeof(TSource).Name, logLevel, message, exception);
string sourceName = null;
if (source != null)
{
sourceName = source.GetType().Name;
}

var traceMessage = new MqttNetLogMessage(_logId, DateTime.Now, Environment.CurrentManagedThreadId, sourceName, logLevel, message, exception);

if (hasGlobalListeners)
{


+ 1
- 0
Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs Прегледај датотеку

@@ -6,6 +6,7 @@ namespace MQTTnet.Server
{
int ConnectionBacklog { get; }
int MaxPendingMessagesPerClient { get; }
MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }

TimeSpan DefaultCommunicationTimeout { get; }



+ 3
- 3
Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs Прегледај датотеку

@@ -59,7 +59,7 @@ namespace MQTTnet.Server
// Values described here: [MQTT-3.1.2-24].
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D)
{
_logger.Warning<MqttClientSession>("Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
_logger.Warning(this, null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId);

_timeoutCallback?.Invoke();

@@ -74,11 +74,11 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
_logger.Error(this, exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
}
finally
{
_logger.Verbose<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId);
_logger.Verbose(this, "Client {0}: Stopped checking keep alive timeout.", _clientId);
}
}



+ 28
- 21
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs Прегледај датотеку

@@ -50,25 +50,27 @@ namespace MQTTnet.Server
}
}

public async Task DropPacket()
{
MqttBasePacket packet = null;
await _queueWaitSemaphore.WaitAsync().ConfigureAwait(false);
if (!_queue.TryDequeue(out packet))
{
throw new InvalidOperationException(); // should not happen
}
_queueWaitSemaphore.Release();
}

public void Enqueue(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

if (_queue.Count >= _options.MaxPendingMessagesPerClient)
{
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
return;
}

if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
_queue.TryDequeue(out _);
}
}

_queue.Enqueue(packet);
_queueAutoResetEvent.Set();

_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
_logger.Verbose(this, "Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
}

private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
@@ -77,7 +79,7 @@ namespace MQTTnet.Server
{
while (!cancellationToken.IsCancellationRequested)
{
await SendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
await TrySendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -85,19 +87,23 @@ namespace MQTTnet.Server
}
catch (Exception exception)
{
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
_logger.Error(this, exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
}
}

private async Task SendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
private async Task TrySendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
MqttBasePacket packet = null;
try
{
await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false);
if (_queue.IsEmpty)
{
await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false);
}

if (!_queue.TryDequeue(out packet))
{
throw new InvalidOperationException(); // should not happen
return;
}

if (cancellationToken.IsCancellationRequested)
@@ -107,24 +113,25 @@ namespace MQTTnet.Server

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false);

_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).",
_clientSession.ClientId);
}
catch (Exception exception)
{
if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId);
_logger.Warning(this, exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId);
}
else if (exception is MqttCommunicationException)
{
_logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId);
_logger.Warning(this, exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId);
}
else if (exception is OperationCanceledException)
{
}
else
{
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
_logger.Error(this, exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
}

if (packet is MqttPublishPacket publishPacket)


+ 2
- 5
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs Прегледај датотеку

@@ -147,10 +147,7 @@ namespace MQTTnet.Server
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}
if (_options.MaxPendingMessagesPerClient <= PendingMessagesQueue.Count)
{
await PendingMessagesQueue.DropPacket();
}

PendingMessagesQueue.Enqueue(publishPacket);
}

@@ -276,7 +273,7 @@ namespace MQTTnet.Server
return Task.FromResult(0);
}

_logger.Warning<MqttClientSession>("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
_logger.Warning(this, null, "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
Stop(MqttClientDisconnectType.NotClean);

return Task.FromResult(0);


+ 8
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttPendingMessagesOverflowStrategy.cs Прегледај датотеку

@@ -0,0 +1,8 @@
namespace MQTTnet.Server
{
public enum MqttPendingMessagesOverflowStrategy
{
DropOldestQueuedMessage,
DropNewMessage
}
}

+ 3
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs Прегледај датотеку

@@ -11,7 +11,9 @@ namespace MQTTnet.Server
public int ConnectionBacklog { get; set; } = 10;

public int MaxPendingMessagesPerClient { get; set; } = 250;

public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage;

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);

public Action<MqttConnectionValidatorContext> ConnectionValidator { get; set; }


Loading…
Откажи
Сачувај