Kaynağa Gözat

Fix Session_Tests.Manage_Session_MaxParallel

release/3.x.x
SilverFox 3 yıl önce
ebeveyn
işleme
f99daee4d1
1 değiştirilmiş dosya ile 14 ekleme ve 10 silme
  1. +14
    -10
      Source/MQTTnet/Server/MqttClientSessionsManager.cs

+ 14
- 10
Source/MQTTnet/Server/MqttClientSessionsManager.cs Dosyayı Görüntüle

@@ -1,4 +1,4 @@
using MQTTnet.Adapter;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
@@ -21,7 +21,7 @@ namespace MQTTnet.Server
{
readonly BlockingCollection<MqttPendingApplicationMessage> _messageQueue = new BlockingCollection<MqttPendingApplicationMessage>();

readonly object _createConnectionSyncRoot = new object();
readonly AsyncLock _createConnectionSyncRoot = new AsyncLock();
readonly Dictionary<string, MqttClientConnection> _connections = new Dictionary<string, MqttClientConnection>();
readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();

@@ -98,7 +98,7 @@ namespace MQTTnet.Server
return;
}

var connection = CreateClientConnection(connectPacket, connectionValidatorContext, channelAdapter);
var connection = await CreateClientConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket.ClientId).ConfigureAwait(false);
await connection.RunAsync().ConfigureAwait(false);
}
@@ -387,9 +387,12 @@ namespace MQTTnet.Server
return context;
}

MqttClientConnection CreateClientConnection(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
async Task<MqttClientConnection> CreateClientConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
{
lock (_createConnectionSyncRoot)
MqttClientConnection existingConnection;
MqttClientConnection connection;

using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
MqttClientSession session;
lock (_sessions)
@@ -415,8 +418,6 @@ namespace MQTTnet.Server
_sessions[connectPacket.ClientId] = session;
}

MqttClientConnection existingConnection;
MqttClientConnection connection;
lock (_connections)
{
_connections.TryGetValue(connectPacket.ClientId, out existingConnection);
@@ -425,10 +426,13 @@ namespace MQTTnet.Server
_connections[connectPacket.ClientId] = connection;
}

existingConnection?.StopAsync(MqttClientDisconnectReason.SessionTakenOver).GetAwaiter().GetResult();

return connection;
if (existingConnection != null)
{
await existingConnection.StopAsync(MqttClientDisconnectReason.SessionTakenOver).ConfigureAwait(false);
}
}

return connection;
}

async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(IMqttServerApplicationMessageInterceptor interceptor, MqttClientConnection clientConnection, MqttApplicationMessage applicationMessage)


Yükleniyor…
İptal
Kaydet