@@ -14,7 +14,7 @@ namespace MQTTnet.Server | |||||
void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket); | void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket); | ||||
void ClearPendingApplicationMessages(); | void ClearPendingApplicationMessages(); | ||||
Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); | |||||
Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); | |||||
void Stop(MqttClientDisconnectType disconnectType); | void Stop(MqttClientDisconnectType disconnectType); | ||||
Task SubscribeAsync(IList<TopicFilter> topicFilters); | Task SubscribeAsync(IList<TopicFilter> topicFilters); | ||||
@@ -65,7 +65,7 @@ namespace MQTTnet.Server | |||||
status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; | status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; | ||||
} | } | ||||
public async Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||||
public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||||
{ | { | ||||
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); | if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); | ||||
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); | if (adapter == null) throw new ArgumentNullException(nameof(adapter)); | ||||
@@ -129,8 +129,6 @@ namespace MQTTnet.Server | |||||
_cancellationTokenSource?.Dispose(); | _cancellationTokenSource?.Dispose(); | ||||
_cancellationTokenSource = null; | _cancellationTokenSource = null; | ||||
} | } | ||||
return _wasCleanDisconnect; | |||||
} | } | ||||
public void Stop(MqttClientDisconnectType type) | public void Stop(MqttClientDisconnectType type) | ||||
@@ -157,6 +155,8 @@ namespace MQTTnet.Server | |||||
finally | finally | ||||
{ | { | ||||
_logger.Info("Client '{0}': Session stopped.", ClientId); | _logger.Info("Client '{0}': Session stopped.", ClientId); | ||||
_sessionsManager.Server.OnClientDisconnected(ClientId, _wasCleanDisconnect); | |||||
} | } | ||||
} | } | ||||
@@ -125,6 +125,7 @@ namespace MQTTnet.Server | |||||
{ | { | ||||
_sessions.Remove(clientId); | _sessions.Remove(clientId); | ||||
} | } | ||||
_logger.Verbose("Session for client '{0}' deleted.", clientId); | _logger.Verbose("Session for client '{0}' deleted.", clientId); | ||||
} | } | ||||
@@ -207,8 +208,7 @@ namespace MQTTnet.Server | |||||
private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) | private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) | ||||
{ | { | ||||
var clientId = string.Empty; | var clientId = string.Empty; | ||||
var wasCleanDisconnect = false; | |||||
try | try | ||||
{ | { | ||||
var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); | var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); | ||||
@@ -253,7 +253,7 @@ namespace MQTTnet.Server | |||||
Server.OnClientConnected(clientId); | Server.OnClientConnected(clientId); | ||||
wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); | |||||
await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); | |||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
{ | { | ||||
@@ -278,8 +278,6 @@ namespace MQTTnet.Server | |||||
{ | { | ||||
DeleteSession(clientId); | DeleteSession(clientId); | ||||
} | } | ||||
Server.OnClientDisconnected(clientId, wasCleanDisconnect); | |||||
} | } | ||||
} | } | ||||
@@ -1,15 +1,18 @@ | |||||
using System.Net.Sockets; | |||||
using System; | |||||
using System.Net.Sockets; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | using Microsoft.VisualStudio.TestTools.UnitTesting; | ||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Diagnostics; | |||||
using MQTTnet.Exceptions; | using MQTTnet.Exceptions; | ||||
using MQTTnet.Implementations; | |||||
using MQTTnet.Server; | |||||
namespace MQTTnet.Core.Tests | namespace MQTTnet.Core.Tests | ||||
{ | { | ||||
[TestClass] | [TestClass] | ||||
public class MqttClientTests | public class MqttClientTests | ||||
{ | { | ||||
[TestMethod] | [TestMethod] | ||||
public async Task ClientDisconnectException() | public async Task ClientDisconnectException() | ||||
{ | { | ||||
@@ -60,7 +60,7 @@ namespace MQTTnet.Core.Tests | |||||
{ | { | ||||
public string ClientId { get; } | public string ClientId { get; } | ||||
public int StopCalledCount { get; set; } | |||||
public int StopCalledCount { get; private set; } | |||||
public void FillStatus(MqttClientSessionStatus status) | public void FillStatus(MqttClientSessionStatus status) | ||||
{ | { | ||||
@@ -77,7 +77,7 @@ namespace MQTTnet.Core.Tests | |||||
throw new NotSupportedException(); | throw new NotSupportedException(); | ||||
} | } | ||||
public Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||||
public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||||
{ | { | ||||
throw new NotSupportedException(); | throw new NotSupportedException(); | ||||
} | } | ||||
@@ -484,6 +484,54 @@ namespace MQTTnet.Core.Tests | |||||
Assert.IsTrue(bodyIsMatching); | Assert.IsTrue(bodyIsMatching); | ||||
} | } | ||||
[TestMethod] | |||||
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() | |||||
{ | |||||
var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); | |||||
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); | |||||
var connectedClient = false; | |||||
var connecteCalledBeforeConnectedClients = false; | |||||
s.ClientConnected += (_, __) => | |||||
{ | |||||
connecteCalledBeforeConnectedClients |= connectedClient; | |||||
connectedClient = true; | |||||
}; | |||||
s.ClientDisconnected += (_, __) => | |||||
{ | |||||
connectedClient = false; | |||||
}; | |||||
var clientOptions = new MqttClientOptionsBuilder() | |||||
.WithTcpServer("localhost") | |||||
.WithClientId(Guid.NewGuid().ToString()) | |||||
.Build(); | |||||
await s.StartAsync(new MqttServerOptions()); | |||||
var c1 = new MqttFactory().CreateMqttClient(); | |||||
var c2 = new MqttFactory().CreateMqttClient(); | |||||
await c1.ConnectAsync(clientOptions); | |||||
await Task.Delay(100); | |||||
await c2.ConnectAsync(clientOptions); | |||||
await Task.Delay(100); | |||||
await c1.DisconnectAsync(); | |||||
await c2.DisconnectAsync(); | |||||
await s.StopAsync(); | |||||
await Task.Delay(100); | |||||
Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); | |||||
} | |||||
private class TestStorage : IMqttServerStorage | private class TestStorage : IMqttServerStorage | ||||
{ | { | ||||
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>(); | public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>(); | ||||