瀏覽代碼

Wrap COM exceptions, close client connection when server stops, process retained messages from server

release/3.x.x
Christian Kratky 7 年之前
父節點
當前提交
8de87c8772
共有 10 個文件被更改,包括 201 次插入225 次删除
  1. +5
    -2
      Build/MQTTnet.nuspec
  2. +8
    -7
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  3. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
  4. +1
    -3
      Frameworks/MQTTnet.NetStandard/MqttFactory.cs
  5. +93
    -132
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  6. +1
    -2
      MQTTnet.Core/Server/IMqttServer.cs
  7. +1
    -1
      MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs
  8. +19
    -34
      MQTTnet.Core/Server/MqttClientSession.cs
  9. +64
    -21
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  10. +8
    -22
      MQTTnet.Core/Server/MqttServer.cs

+ 5
- 2
Build/MQTTnet.nuspec 查看文件

@@ -10,8 +10,11 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Client] Fixed WebSocket sub protocol negotiation (Thanks to @JanEggers)

<releaseNotes>* [Core] Fixed library reference issues for .NET 4.6 (Thanks to @JanEggers).
* [Core] Several COM exceptions are now wrapped properly resulting in less warnings in the trace.
* [Client] Fixed WebSocket sub protocol negotiation for ASP.NET Core 2 servers (Thanks to @JanEggers).
* [Server] Client connections are now closed when the server is stopped (Thanks to @zhudanfei).
* [Server] Published messages from the server are now retained (if set) (Thanks to @ChristianRiedl). BREAKING CHANGE!
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 8
- 7
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs 查看文件

@@ -19,9 +19,8 @@ namespace MQTTnet.Implementations
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
public Stream RawReceiveStream { get; private set; }
public Stream SendStream { get; private set; }
public Stream ReceiveStream { get; private set; }

public async Task ConnectAsync()
{
@@ -32,7 +31,7 @@ namespace MQTTnet.Implementations
}

_webSocket = new ClientWebSocket();
if (_options.RequestHeaders != null)
{
foreach (var requestHeader in _options.RequestHeaders)
@@ -64,13 +63,13 @@ namespace MQTTnet.Implementations
}

await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None).ConfigureAwait(false);
RawReceiveStream = new WebSocketStream(_webSocket);

SendStream = new WebSocketStream(_webSocket);
ReceiveStream = SendStream;
}

public async Task DisconnectAsync()
{
RawReceiveStream = null;

if (_webSocket == null)
{
return;
@@ -80,6 +79,8 @@ namespace MQTTnet.Implementations
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}

_webSocket = null;
}

public void Dispose()


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs 查看文件

@@ -43,7 +43,7 @@ namespace MQTTnet.Implementations
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
currentOffset += response.Count;
count -= response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);


+ 1
- 3
Frameworks/MQTTnet.NetStandard/MqttFactory.cs 查看文件

@@ -85,9 +85,7 @@ namespace MQTTnet
clientSessionsManager,
_serviceProvider.GetRequiredService<MqttClientSubscriptionsManager>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientSession>>(),
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>(),
_serviceProvider.GetRequiredService<IMqttClientRetainedMessageManager>()
);
_serviceProvider.GetRequiredService<ILogger<MqttClientPendingMessagesQueue>>());
}

public IMqttClient CreateMqttClient()


+ 93
- 132
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs 查看文件

@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
@@ -14,10 +15,12 @@ namespace MQTTnet.Core.Adapter
{
public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
{
private const uint ErrorOperationAborted = 0x800703E3;

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly ILogger<MqttChannelCommunicationAdapter> _logger;
private readonly IMqttCommunicationChannel _channel;
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer, ILogger<MqttChannelCommunicationAdapter> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -29,166 +32,87 @@ namespace MQTTnet.Core.Adapter

public async Task ConnectAsync(TimeSpan timeout)
{
try
{
await _channel.ConnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
await ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
}

public async Task DisconnectAsync(TimeSpan timeout)
{
try
{
await _channel.DisconnectAsync().TimeoutAfter(timeout).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
await ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
}

public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
{
try
await ExecuteAndWrapExceptionAsync(async () =>
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

foreach (var packet in packets)
try
{
if (packet == null)
foreach (var packet in packets)
{
continue;
}
if (packet == null)
{
continue;
}

_logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout);
_logger.LogInformation("TX >>> {0} [Timeout={1}]", packet, timeout);

var writeBuffer = PacketSerializer.Serialize(packet);
await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false);
}
var writeBuffer = PacketSerializer.Serialize(packet);
await _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false);
}

if (timeout > TimeSpan.Zero)
{
await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
if (timeout > TimeSpan.Zero)
{
await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
}
else
{
await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
else
finally
{
await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false);
_semaphore.Release();
}
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
finally
{
_semaphore.Release();
}
});
}

public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ReceivedMqttPacket receivedMqttPacket = null;
try
MqttBasePacket packet = null;
await ExecuteAndWrapExceptionAsync(async () =>
{
if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
}
else
ReceivedMqttPacket receivedMqttPacket = null;
try
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
}
if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
}
else
{
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
}

if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}
if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}

packet = PacketSerializer.Deserialize(receivedMqttPacket);
if (packet == null)
{
throw new MqttProtocolViolationException("Received malformed packet.");
}

var packet = PacketSerializer.Deserialize(receivedMqttPacket);
if (packet == null)
_logger.LogInformation("RX <<< {0}", packet);
}
finally
{
throw new MqttProtocolViolationException("Received malformed packet.");
receivedMqttPacket?.Dispose();
}
});

_logger.LogInformation("RX <<< {0}", packet);
return packet;
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
finally
{
receivedMqttPacket?.Dispose();
}
return packet;
}

private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
@@ -215,5 +139,42 @@ namespace MQTTnet.Core.Adapter

return new ReceivedMqttPacket(header, new MemoryStream(body, 0, body.Length));
}

private static async Task ExecuteAndWrapExceptionAsync(Func<Task> action)
{
try
{
await action().ConfigureAwait(false);
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (MqttCommunicationTimedOutException)
{
throw;
}
catch (MqttCommunicationException)
{
throw;
}
catch (COMException comException)
{
if ((uint)comException.HResult == ErrorOperationAborted)
{
throw new OperationCanceledException();
}

throw new MqttCommunicationException(comException);
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
}
}
}

+ 1
- 2
MQTTnet.Core/Server/IMqttServer.cs 查看文件

@@ -10,8 +10,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
event EventHandler<MqttServerStartedEventArgs> Started;

IList<ConnectedMqttClient> GetConnectedClients();
void Publish(IEnumerable<MqttApplicationMessage> applicationMessages);
Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync();

Task StartAsync();
Task StopAsync();


+ 1
- 1
MQTTnet.Core/Server/MqttClientPendingMessagesQueue.cs 查看文件

@@ -96,7 +96,7 @@ namespace MQTTnet.Core.Server
_pendingPublishPackets.Add(packet, CancellationToken.None);
}

_session.Stop();
await _session.StopAsync();
}
}
}


+ 19
- 34
MQTTnet.Core/Server/MqttClientSession.cs 查看文件

@@ -17,7 +17,6 @@ namespace MQTTnet.Core.Server
{
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();

private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;
private readonly MqttClientSubscriptionsManager _subscriptionsManager;
private readonly MqttClientSessionsManager _sessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
@@ -34,10 +33,8 @@ namespace MQTTnet.Core.Server
MqttClientSessionsManager sessionsManager,
MqttClientSubscriptionsManager subscriptionsManager,
ILogger<MqttClientSession> logger,
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger,
IMqttClientRetainedMessageManager clientRetainedMessageManager)
ILogger<MqttClientPendingMessagesQueue> messageQueueLogger)
{
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -82,7 +79,7 @@ namespace MQTTnet.Core.Server
}
}

public void Stop()
public async Task StopAsync()
{
try
{
@@ -90,17 +87,21 @@ namespace MQTTnet.Core.Server
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_adapter = null;
if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
_adapter = null;
}

_logger.LogInformation("Client '{0}': Disconnected.", ClientId);
_logger.LogInformation("Client '{0}': Session stopped.", ClientId);
}
finally
{
var willMessage = _willMessage;
var willMessage = _willMessage;
if (willMessage != null)
{
_willMessage = null; //clear willmessage so it is send just once
_sessionsManager.DispatchApplicationMessage(this, willMessage);
await _sessionsManager.DispatchApplicationMessageAsync(this, willMessage);
}
}
}
@@ -133,12 +134,12 @@ namespace MQTTnet.Core.Server
catch (MqttCommunicationException exception)
{
_logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
Stop();
await StopAsync();
}
catch (Exception exception)
{
_logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
Stop();
await StopAsync();
}
}

@@ -182,14 +183,11 @@ namespace MQTTnet.Core.Server

if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
Stop();
return Task.FromResult(0);
return StopAsync();
}

_logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
Stop();

return Task.FromResult(0);
return StopAsync();
}

private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
@@ -202,13 +200,13 @@ namespace MQTTnet.Core.Server
if (subscribeResult.CloseConnection)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false);
Stop();
await StopAsync();
}
}

private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
{
var retainedMessages = await _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket).ConfigureAwait(false);
var retainedMessages = await _sessionsManager.GetRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
foreach (var publishPacket in retainedMessages)
{
EnqueuePublishPacket(publishPacket.ToPublishPacket());
@@ -219,29 +217,16 @@ namespace MQTTnet.Core.Server
{
var applicationMessage = publishPacket.ToApplicationMessage();

var interceptorContext = new MqttApplicationMessageInterceptorContext
{
ApplicationMessage = applicationMessage
};

_options.ApplicationMessageInterceptor?.Invoke(interceptorContext);
applicationMessage = interceptorContext.ApplicationMessage;

if (applicationMessage.Retain)
{
await _clientRetainedMessageManager.HandleMessageAsync(ClientId, applicationMessage).ConfigureAwait(false);
}

switch (applicationMessage.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
_sessionsManager.DispatchApplicationMessage(this, applicationMessage);
await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage);
return;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
_sessionsManager.DispatchApplicationMessage(this, applicationMessage);
await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage);

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken,
new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
@@ -256,7 +241,7 @@ namespace MQTTnet.Core.Server
_unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
}

_sessionsManager.DispatchApplicationMessage(this, applicationMessage);
await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage);

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken,
new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }).ConfigureAwait(false);


+ 64
- 21
MQTTnet.Core/Server/MqttClientSessionsManager.cs 查看文件

@@ -16,19 +16,24 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientSessionsManager
{
private readonly Dictionary<string, MqttClientSession> _clientSessions = new Dictionary<string, MqttClientSession>();
private readonly Dictionary<string, MqttClientSession> _sessions = new Dictionary<string, MqttClientSession>();
private readonly SemaphoreSlim _sessionsSemaphore = new SemaphoreSlim(1, 1);

private readonly MqttServerOptions _options;
private readonly ILogger<MqttClientSessionsManager> _logger;
private readonly IMqttClientSesssionFactory _clientSesssionFactory;
private readonly MqttServerOptions _options;
private readonly IMqttClientRetainedMessageManager _clientRetainedMessageManager;

public MqttClientSessionsManager(
IOptions<MqttServerOptions> options,
IOptions<MqttServerOptions> options,
ILogger<MqttClientSessionsManager> logger,
IMqttClientSesssionFactory clientSesssionFactory)
IMqttClientSesssionFactory clientSesssionFactory,
IMqttClientRetainedMessageManager clientRetainedMessageManager)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_clientSesssionFactory = clientSesssionFactory ?? throw new ArgumentNullException(nameof(clientSesssionFactory));
_clientRetainedMessageManager = clientRetainedMessageManager ?? throw new ArgumentNullException(nameof(clientRetainedMessageManager));
}

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
@@ -61,7 +66,7 @@ namespace MQTTnet.Core.Server
return;
}

var clientSession = GetOrCreateClientSession(connectPacket);
var clientSession = await GetOrCreateClientSessionAsync(connectPacket);

await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
{
@@ -92,7 +97,7 @@ namespace MQTTnet.Core.Server
}
catch (Exception)
{
//ignored
// ignored
}

ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient
@@ -103,30 +108,58 @@ namespace MQTTnet.Core.Server
}
}

public void Clear()
public async Task StopAsync()
{
lock (_clientSessions)
await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
foreach (var session in _sessions)
{
await session.Value.StopAsync();
}

_sessions.Clear();
}
finally
{
_clientSessions.Clear();
_sessionsSemaphore.Release();
}
}

public IList<ConnectedMqttClient> GetConnectedClients()
public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
lock (_clientSessions)
await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return _clientSessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311
}).ToList();
}
finally
{
_sessionsSemaphore.Release();
}
}

public void DispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
public async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
try
{
var interceptorContext = new MqttApplicationMessageInterceptorContext
{
ApplicationMessage = applicationMessage
};

_options.ApplicationMessageInterceptor?.Invoke(interceptorContext);
applicationMessage = interceptorContext.ApplicationMessage;

if (applicationMessage.Retain)
{
await _clientRetainedMessageManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false);
}

var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, applicationMessage);
ApplicationMessageReceived?.Invoke(this, eventArgs);
}
@@ -135,15 +168,20 @@ namespace MQTTnet.Core.Server
_logger.LogError(new EventId(), exception, "Error while processing application message");
}

lock (_clientSessions)
lock (_sessions)
{
foreach (var clientSession in _clientSessions.Values.ToList())
foreach (var clientSession in _sessions.Values.ToList())
{
clientSession.EnqueuePublishPacket(applicationMessage.ToPublishPacket());
}
}
}

public Task<List<MqttApplicationMessage>> GetRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
{
return _clientRetainedMessageManager.GetSubscribedMessagesAsync(subscribePacket);
}

private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket)
{
if (_options.ConnectionValidator != null)
@@ -154,17 +192,18 @@ namespace MQTTnet.Core.Server
return MqttConnectReturnCode.ConnectionAccepted;
}

private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacket connectPacket)
private async Task<GetOrCreateClientSessionResult> GetOrCreateClientSessionAsync(MqttConnectPacket connectPacket)
{
lock (_clientSessions)
await _sessionsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var isSessionPresent = _clientSessions.TryGetValue(connectPacket.ClientId, out var clientSession);
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
if (isSessionPresent)
{
if (connectPacket.CleanSession)
{
_clientSessions.Remove(connectPacket.ClientId);
clientSession.Stop();
_sessions.Remove(connectPacket.ClientId);
await clientSession.StopAsync();
clientSession = null;

_logger.LogTrace("Stopped existing session of client '{0}'.", connectPacket.ClientId);
@@ -181,13 +220,17 @@ namespace MQTTnet.Core.Server
isExistingSession = false;

clientSession = _clientSesssionFactory.CreateClientSession(connectPacket.ClientId, this);
_clientSessions[connectPacket.ClientId] = clientSession;
_sessions[connectPacket.ClientId] = clientSession;

_logger.LogTrace("Created a new session for client '{0}'.", connectPacket.ClientId);
}

return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
}
finally
{
_sessionsSemaphore.Release();
}
}
}
}

+ 8
- 22
MQTTnet.Core/Server/MqttServer.cs 查看文件

@@ -20,12 +20,11 @@ namespace MQTTnet.Core.Server
private CancellationTokenSource _cancellationTokenSource;

public MqttServer(
IOptions<MqttServerOptions> options,
IOptions<MqttServerOptions> options,
IEnumerable<IMqttServerAdapter> adapters,
ILogger<MqttServer> logger,
MqttClientSessionsManager clientSessionsManager,
IMqttClientRetainedMessageManager clientRetainedMessageManager
)
IMqttClientRetainedMessageManager clientRetainedMessageManager)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -35,7 +34,7 @@ namespace MQTTnet.Core.Server
if (adapters == null)
{
throw new ArgumentNullException(nameof(adapters));
}
}

_adapters = adapters.ToList();

@@ -44,9 +43,9 @@ namespace MQTTnet.Core.Server
_clientSessionsManager.ClientDisconnected += OnClientDisconnected;
}

public IList<ConnectedMqttClient> GetConnectedClients()
public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
return _clientSessionsManager.GetConnectedClients();
return _clientSessionsManager.GetConnectedClientsAsync();
}

public event EventHandler<MqttServerStartedEventArgs> Started;
@@ -54,7 +53,7 @@ namespace MQTTnet.Core.Server
public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public void Publish(IEnumerable<MqttApplicationMessage> applicationMessages)
public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

@@ -65,23 +64,10 @@ namespace MQTTnet.Core.Server

foreach (var applicationMessage in applicationMessages)
{
var interceptorContext = new MqttApplicationMessageInterceptorContext
{
ApplicationMessage = applicationMessage
};

_options.ApplicationMessageInterceptor?.Invoke(interceptorContext);
_clientSessionsManager.DispatchApplicationMessage(null, interceptorContext.ApplicationMessage);
await _clientSessionsManager.DispatchApplicationMessageAsync(null, applicationMessage);
}
}

public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
Publish(applicationMessages);
return Task.FromResult(0);
}

public async Task StartAsync()
{
if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started.");
@@ -113,7 +99,7 @@ namespace MQTTnet.Core.Server
await adapter.StopAsync();
}

_clientSessionsManager.Clear();
await _clientSessionsManager.StopAsync();

_logger.LogInformation("Stopped.");
}


Loading…
取消
儲存