Kaynağa Gözat

Merge develop

release/3.x.x
Christian Kratky 6 yıl önce
ebeveyn
işleme
8b376874c0
2 değiştirilmiş dosya ile 53 ekleme ve 20 silme
  1. +51
    -20
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  2. +2
    -0
      Source/MQTTnet/Server/MqttServer.cs

+ 51
- 20
Source/MQTTnet/Server/MqttClientSessionsManager.cs Dosyayı Görüntüle

@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
@@ -12,10 +13,14 @@ using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public class MqttClientSessionsManager
public class MqttClientSessionsManager : IDisposable
{
private readonly BlockingCollection<MqttEnqueuedApplicationMessage> _messageQueue = new BlockingCollection<MqttEnqueuedApplicationMessage>();
private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new ConcurrentDictionary<string, MqttClientSession>();

/// <summary>
/// manual locking dictionaries is faster than using concurrent dictionary
/// </summary>
private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();

private readonly CancellationToken _cancellationToken;

@@ -44,12 +49,15 @@ namespace MQTTnet.Server

public void Stop()
{
foreach (var session in _sessions)
lock (_sessions)
{
session.Value.Stop(MqttClientDisconnectType.NotClean);
}
foreach (var session in _sessions)
{
session.Value.Stop(MqttClientDisconnectType.NotClean);
}

_sessions.Clear();
_sessions.Clear();
}
}

public Task StartSession(IMqttChannelAdapter clientAdapter)
@@ -60,10 +68,11 @@ namespace MQTTnet.Server
public Task<IList<IMqttClientSessionStatus>> GetClientStatusAsync()
{
var result = new List<IMqttClientSessionStatus>();
foreach (var session in _sessions)

foreach (var session in GetSessions())
{
var status = new MqttClientSessionStatus(this, session.Value);
session.Value.FillStatus(status);
var status = new MqttClientSessionStatus(this, session);
session.FillStatus(status);

result.Add(status);
}
@@ -83,12 +92,15 @@ namespace MQTTnet.Server
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

if (!_sessions.TryGetValue(clientId, out var session))
lock (_sessions)
{
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}
if (!_sessions.TryGetValue(clientId, out var session))
{
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}

return session.SubscribeAsync(topicFilters);
return session.SubscribeAsync(topicFilters);
}
}

public Task UnsubscribeAsync(string clientId, IList<string> topicFilters)
@@ -96,20 +108,31 @@ namespace MQTTnet.Server
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

if (!_sessions.TryGetValue(clientId, out var session))
lock (_sessions)
{
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}
if (!_sessions.TryGetValue(clientId, out var session))
{
throw new InvalidOperationException($"Client session '{clientId}' is unknown.");
}

return session.UnsubscribeAsync(topicFilters);
return session.UnsubscribeAsync(topicFilters);
}
}

public void DeleteSession(string clientId)
{
_sessions.TryRemove(clientId, out _);
lock (_sessions)
{
_sessions.Remove(clientId);
}
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public void Dispose()
{
_messageQueue?.Dispose();
}

private void ProcessQueuedApplicationMessages(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
@@ -143,7 +166,7 @@ namespace MQTTnet.Server
_retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult();
}

foreach (var clientSession in _sessions.Values)
foreach (var clientSession in GetSessions())
{
clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage.ToPublishPacket());
}
@@ -158,6 +181,14 @@ namespace MQTTnet.Server
}
}

private List<MqttClientSession> GetSessions()
{
lock (_sessions)
{
return _sessions.Values.ToList();
}
}

private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
@@ -263,7 +294,7 @@ namespace MQTTnet.Server
{
if (connectPacket.CleanSession)
{
_sessions.TryRemove(connectPacket.ClientId, out _);
_sessions.Remove(connectPacket.ClientId);

clientSession.Stop(MqttClientDisconnectType.Clean);
clientSession.Dispose();


+ 2
- 0
Source/MQTTnet/Server/MqttServer.cs Dosyayı Görüntüle

@@ -123,6 +123,8 @@ namespace MQTTnet.Server
_cancellationTokenSource = null;

_retainedMessagesManager = null;

_clientSessionsManager?.Dispose();
_clientSessionsManager = null;
}
}


Yükleniyor…
İptal
Kaydet