Browse Source

Process all queued message before server stops (#1234)

* process all queued message befor server stops

* fix an issue

Co-authored-by: ablfzl <ablfzl21@outlook.com>
release/3.x.x
Ablfzl Edgelolli 3 years ago
committed by GitHub
parent
commit
dd0aba0640
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 11 deletions
  1. +7
    -11
      Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs

+ 7
- 11
Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs View File

@@ -80,7 +80,7 @@ namespace MQTTnet.Server.Internal
_logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint); _logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
return null; return null;
} }
public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{ {
MqttClientConnection clientConnection = null; MqttClientConnection clientConnection = null;
@@ -95,7 +95,7 @@ namespace MQTTnet.Server.Internal
} }


MqttConnAckPacket connAckPacket; MqttConnAckPacket connAckPacket;
var connectionValidatorContext = await ValidateConnection(connectPacket, channelAdapter).ConfigureAwait(false); var connectionValidatorContext = await ValidateConnection(connectPacket, channelAdapter).ConfigureAwait(false);
if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{ {
@@ -272,11 +272,12 @@ namespace MQTTnet.Server.Internal


async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)
{ {
while (!cancellationToken.IsCancellationRequested)
//make sure all queued messages are proccessed befor server stops
while (!cancellationToken.IsCancellationRequested || _messageQueue.Any())
{ {
try try
{ {
await TryProcessNextQueuedApplicationMessageAsync(cancellationToken).ConfigureAwait(false);
await TryProcessNextQueuedApplicationMessageAsync().ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@@ -288,16 +289,14 @@ namespace MQTTnet.Server.Internal
} }
} }


async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken)
async Task TryProcessNextQueuedApplicationMessageAsync()
{ {
try try
{ {
cancellationToken.ThrowIfCancellationRequested();

MqttPendingApplicationMessage queuedApplicationMessage; MqttPendingApplicationMessage queuedApplicationMessage;
try try
{ {
queuedApplicationMessage = _messageQueue.Take(cancellationToken);
queuedApplicationMessage = _messageQueue.Take();
} }
catch (ArgumentNullException) catch (ArgumentNullException)
{ {
@@ -370,9 +369,6 @@ namespace MQTTnet.Server.Internal
await InterceptApplicationMessageAsync(undeliveredMessageInterceptor, clientConnection, applicationMessage).ConfigureAwait(false); await InterceptApplicationMessageAsync(undeliveredMessageInterceptor, clientConnection, applicationMessage).ConfigureAwait(false);
} }
} }
catch (OperationCanceledException)
{
}
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error(exception, "Unhandled exception while processing next queued application message."); _logger.Error(exception, "Unhandled exception while processing next queued application message.");


Loading…
Cancel
Save