Browse Source

Small refactorings

release/3.x.x
Christian Kratky 7 years ago
parent
commit
daf22c762f
3 changed files with 18 additions and 13 deletions
  1. +2
    -2
      MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
  2. +15
    -11
      MQTTnet.Core/Server/MqttClientSession.cs
  3. +1
    -0
      MQTTnet.Core/Server/MqttClientSessionsManager.cs

+ 2
- 2
MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs View File

@@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


Task.Run(() => SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken);
Task.Factory.StartNew(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
} }


public void Enqueue(MqttPublishPacket publishPacket) public void Enqueue(MqttPublishPacket publishPacket)
@@ -74,7 +74,7 @@ namespace MQTTnet.Core.Server
{ {
_trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception."); _trace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception.");
} }
if (exception is OperationCanceledException)
else if (exception is OperationCanceledException)
{ {
} }
else else


+ 15
- 11
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -45,10 +45,9 @@ namespace MQTTnet.Core.Server
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


_willMessage = willMessage;

try try
{ {
_willMessage = willMessage;
_adapter = adapter; _adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


@@ -70,18 +69,23 @@ namespace MQTTnet.Core.Server


public void Stop() public void Stop()
{ {
if (_willMessage != null)
try
{ {
_mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
}

_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;


_adapter = null;
_adapter = null;


_trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
_trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
}
finally
{
if (_willMessage != null)
{
_mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
}
}
} }


public void EnqueuePublishPacket(MqttPublishPacket publishPacket) public void EnqueuePublishPacket(MqttPublishPacket publishPacket)


+ 1
- 0
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -160,6 +160,7 @@ namespace MQTTnet.Core.Server
_clientSessions.Remove(connectPacket.ClientId); _clientSessions.Remove(connectPacket.ClientId);
clientSession.Dispose(); clientSession.Dispose();
clientSession = null; clientSession = null;

_trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId); _trace.Verbose(nameof(MqttClientSessionsManager), "Disposed existing session of client '{0}'.", connectPacket.ClientId);
} }
else else


Loading…
Cancel
Save