@@ -15,7 +15,7 @@ | |||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
<dependencies> | <dependencies> | ||||
<dependency id="MQTTnet" version="2.8.3" /> | |||||
<dependency id="MQTTnet" version="2.8.4" /> | |||||
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.0" /> | <dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.0" /> | ||||
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.0.1" /> | <dependency id="Microsoft.AspNetCore.WebSockets" version="2.0.1" /> | ||||
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.0.1" /> | <dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.0.1" /> | ||||
@@ -15,7 +15,7 @@ | |||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
<dependencies> | <dependencies> | ||||
<dependency id="MQTTnet" version="2.8.3" /> | |||||
<dependency id="MQTTnet" version="2.8.4" /> | |||||
</dependencies> | </dependencies> | ||||
</metadata> | </metadata> | ||||
@@ -15,7 +15,7 @@ | |||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
<dependencies> | <dependencies> | ||||
<dependency id="MQTTnet" version="2.8.3" /> | |||||
<dependency id="MQTTnet" version="2.8.4" /> | |||||
</dependencies> | </dependencies> | ||||
</metadata> | </metadata> | ||||
@@ -10,10 +10,7 @@ | |||||
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | ||||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | <requireLicenseAcceptance>false</requireLicenseAcceptance> | ||||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | ||||
<releaseNotes>* [Core] Added all factory methods to the factory interface. | |||||
* [Core] Fixed an issue with cancellation token handling (thanks to @acrabb). | |||||
* [Server] Added a new overload for configuring the ASP.net integration (thanks to @JanEggers). | |||||
* [Server] Added a method for clearing all retained messages. | |||||
<releaseNotes>* [Client] Fixed a deadlock when an exception is fired while connecting (thanks to @malibVB). | |||||
</releaseNotes> | </releaseNotes> | ||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
@@ -150,6 +150,9 @@ namespace MQTTnet.Adapter | |||||
return packet; | return packet; | ||||
} | } | ||||
catch (OperationCanceledException) | |||||
{ | |||||
} | |||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
if (IsWrappedException(exception)) | if (IsWrappedException(exception)) | ||||
@@ -237,7 +240,8 @@ namespace MQTTnet.Adapter | |||||
{ | { | ||||
if (exception is IOException && exception.InnerException is SocketException socketException) | if (exception is IOException && exception.InnerException is SocketException socketException) | ||||
{ | { | ||||
if (socketException.SocketErrorCode == SocketError.ConnectionAborted) | |||||
if (socketException.SocketErrorCode == SocketError.ConnectionAborted || | |||||
socketException.SocketErrorCode == SocketError.OperationAborted) | |||||
{ | { | ||||
throw new OperationCanceledException(); | throw new OperationCanceledException(); | ||||
} | } | ||||
@@ -29,7 +29,7 @@ namespace MQTTnet.Client | |||||
internal Task _keepAliveMessageSenderTask; | internal Task _keepAliveMessageSenderTask; | ||||
private IMqttChannelAdapter _adapter; | private IMqttChannelAdapter _adapter; | ||||
private bool _cleanDisconnectInitiated; | private bool _cleanDisconnectInitiated; | ||||
private TaskCompletionSource<bool> _disconnectReason; | |||||
private int _disconnectGate; | |||||
public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) | public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) | ||||
{ | { | ||||
@@ -54,12 +54,12 @@ namespace MQTTnet.Client | |||||
try | try | ||||
{ | { | ||||
_cancellationTokenSource = new CancellationTokenSource(); | |||||
_disconnectReason = new TaskCompletionSource<bool>(); | |||||
_options = options; | _options = options; | ||||
_packetIdentifierProvider.Reset(); | _packetIdentifierProvider.Reset(); | ||||
_packetDispatcher.Reset(); | _packetDispatcher.Reset(); | ||||
_cancellationTokenSource = new CancellationTokenSource(); | |||||
_disconnectGate = 0; | |||||
_adapter = _adapterFactory.CreateClientAdapter(options, _logger); | _adapter = _adapterFactory.CreateClientAdapter(options, _logger); | ||||
_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); | _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); | ||||
@@ -87,10 +87,12 @@ namespace MQTTnet.Client | |||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
_logger.Error(exception, "Error while connecting with server."); | _logger.Error(exception, "Error while connecting with server."); | ||||
if (_disconnectReason.TrySetException(exception)) | |||||
if (!DisconnectIsPending()) | |||||
{ | { | ||||
await DisconnectInternalAsync(null, exception).ConfigureAwait(false); | await DisconnectInternalAsync(null, exception).ConfigureAwait(false); | ||||
} | } | ||||
throw; | throw; | ||||
} | } | ||||
} | } | ||||
@@ -108,7 +110,7 @@ namespace MQTTnet.Client | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
if (_disconnectReason.TrySetCanceled()) | |||||
if (!DisconnectIsPending()) | |||||
{ | { | ||||
await DisconnectInternalAsync(null, null).ConfigureAwait(false); | await DisconnectInternalAsync(null, null).ConfigureAwait(false); | ||||
} | } | ||||
@@ -183,7 +185,7 @@ namespace MQTTnet.Client | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
_cancellationTokenSource?.Cancel (false); | |||||
_cancellationTokenSource?.Cancel(false); | |||||
_cancellationTokenSource?.Dispose(); | _cancellationTokenSource?.Dispose(); | ||||
_cancellationTokenSource = null; | _cancellationTokenSource = null; | ||||
@@ -224,9 +226,10 @@ namespace MQTTnet.Client | |||||
private async Task DisconnectInternalAsync(Task sender, Exception exception) | private async Task DisconnectInternalAsync(Task sender, Exception exception) | ||||
{ | { | ||||
InitiateDisconnect(); | |||||
var clientWasConnected = IsConnected; | var clientWasConnected = IsConnected; | ||||
InitiateDisconnect(); | |||||
IsConnected = false; | IsConnected = false; | ||||
try | try | ||||
@@ -247,7 +250,7 @@ namespace MQTTnet.Client | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
Dispose (); | |||||
Dispose(); | |||||
_cleanDisconnectInitiated = false; | _cleanDisconnectInitiated = false; | ||||
_logger.Info("Disconnected."); | _logger.Info("Disconnected."); | ||||
@@ -261,16 +264,16 @@ namespace MQTTnet.Client | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) | |||||
if (_cancellationTokenSource?.IsCancellationRequested == true) | |||||
{ | { | ||||
return; | return; | ||||
} | } | ||||
_cancellationTokenSource.Cancel(false); | |||||
_cancellationTokenSource?.Cancel(false); | |||||
} | } | ||||
catch (Exception adapterException) | |||||
catch (Exception exception) | |||||
{ | { | ||||
_logger.Warning(adapterException, "Error while initiating disconnect."); | |||||
_logger.Warning(exception, "Error while initiating disconnect."); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -358,7 +361,7 @@ namespace MQTTnet.Client | |||||
_logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); | _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets."); | ||||
} | } | ||||
if (_disconnectReason.TrySetException(exception)) | |||||
if (!DisconnectIsPending()) | |||||
{ | { | ||||
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); | await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); | ||||
} | } | ||||
@@ -377,8 +380,10 @@ namespace MQTTnet.Client | |||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested) | while (!cancellationToken.IsCancellationRequested) | ||||
{ | { | ||||
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); | |||||
if (packet != null) | |||||
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken) | |||||
.ConfigureAwait(false); | |||||
if (packet != null && !cancellationToken.IsCancellationRequested) | |||||
{ | { | ||||
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); | await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); | ||||
} | } | ||||
@@ -393,7 +398,6 @@ namespace MQTTnet.Client | |||||
if (exception is OperationCanceledException) | if (exception is OperationCanceledException) | ||||
{ | { | ||||
_logger.Verbose ("MQTT OperationCanceled exception while receiving packets."); | |||||
} | } | ||||
else if (exception is MqttCommunicationException) | else if (exception is MqttCommunicationException) | ||||
{ | { | ||||
@@ -406,7 +410,7 @@ namespace MQTTnet.Client | |||||
_packetDispatcher.Dispatch(exception); | _packetDispatcher.Dispatch(exception); | ||||
if (_disconnectReason.TrySetException(exception)) | |||||
if (!DisconnectIsPending()) | |||||
{ | { | ||||
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); | await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); | ||||
} | } | ||||
@@ -492,16 +496,20 @@ namespace MQTTnet.Client | |||||
private void StartReceivingPackets(CancellationToken cancellationToken) | private void StartReceivingPackets(CancellationToken cancellationToken) | ||||
{ | { | ||||
_packetReceiverTask = Task.Run( | |||||
_packetReceiverTask = Task.Factory.StartNew( | |||||
() => ReceivePacketsAsync(cancellationToken), | () => ReceivePacketsAsync(cancellationToken), | ||||
cancellationToken); | |||||
cancellationToken, | |||||
TaskCreationOptions.LongRunning, | |||||
TaskScheduler.Default); | |||||
} | } | ||||
private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) | private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) | ||||
{ | { | ||||
_keepAliveMessageSenderTask = Task.Run( | |||||
_keepAliveMessageSenderTask = Task.Factory.StartNew( | |||||
() => SendKeepAliveMessagesAsync(cancellationToken), | () => SendKeepAliveMessagesAsync(cancellationToken), | ||||
cancellationToken); | |||||
cancellationToken, | |||||
TaskCreationOptions.LongRunning, | |||||
TaskScheduler.Default); | |||||
} | } | ||||
private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) | private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) | ||||
@@ -537,5 +545,10 @@ namespace MQTTnet.Client | |||||
{ | { | ||||
} | } | ||||
} | } | ||||
private bool DisconnectIsPending() | |||||
{ | |||||
return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0; | |||||
} | |||||
} | } | ||||
} | } |
@@ -8,6 +8,7 @@ using System.Collections.Generic; | |||||
using System.Text; | using System.Text; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Adapter; | |||||
using MQTTnet.Implementations; | using MQTTnet.Implementations; | ||||
namespace MQTTnet.Core.Tests | namespace MQTTnet.Core.Tests | ||||
@@ -552,6 +553,51 @@ namespace MQTTnet.Core.Tests | |||||
Assert.IsTrue(bodyIsMatching); | Assert.IsTrue(bodyIsMatching); | ||||
} | } | ||||
[TestMethod] | |||||
public async Task MqttServer_ConnectionDenied() | |||||
{ | |||||
var server = new MqttFactory().CreateMqttServer(); | |||||
var client = new MqttFactory().CreateMqttClient(); | |||||
try | |||||
{ | |||||
var options = new MqttServerOptionsBuilder().WithConnectionValidator(context => | |||||
{ | |||||
context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized; | |||||
}).Build(); | |||||
await server.StartAsync(options); | |||||
var clientOptions = new MqttClientOptionsBuilder() | |||||
.WithTcpServer("localhost").Build(); | |||||
try | |||||
{ | |||||
await client.ConnectAsync(clientOptions); | |||||
Assert.Fail("An exception should be raised."); | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
if (exception is MqttConnectingFailedException) | |||||
{ | |||||
} | |||||
else | |||||
{ | |||||
Assert.Fail("Wrong exception."); | |||||
} | |||||
} | |||||
} | |||||
finally | |||||
{ | |||||
await client.DisconnectAsync(); | |||||
await server.StopAsync(); | |||||
client.Dispose(); | |||||
} | |||||
} | |||||
[TestMethod] | [TestMethod] | ||||
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() | public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() | ||||
{ | { | ||||
@@ -623,25 +669,24 @@ namespace MQTTnet.Core.Tests | |||||
MqttQualityOfServiceLevel filterQualityOfServiceLevel, | MqttQualityOfServiceLevel filterQualityOfServiceLevel, | ||||
int expectedReceivedMessagesCount) | int expectedReceivedMessagesCount) | ||||
{ | { | ||||
var serverAdapter = new TestMqttServerAdapter(); | |||||
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); | |||||
var s = new MqttFactory().CreateMqttServer(); | |||||
var receivedMessagesCount = 0; | var receivedMessagesCount = 0; | ||||
try | try | ||||
{ | { | ||||
await s.StartAsync(new MqttServerOptions()); | await s.StartAsync(new MqttServerOptions()); | ||||
var c1 = await serverAdapter.ConnectTestClient("c1"); | |||||
var c2 = await serverAdapter.ConnectTestClient("c2"); | |||||
var c1 = new MqttFactory().CreateMqttClient(); | |||||
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | ||||
await c1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); | |||||
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); | await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); | ||||
var c2 = new MqttFactory().CreateMqttClient(); | |||||
await c2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); | |||||
await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel)); | await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel)); | ||||
await Task.Delay(500); | await Task.Delay(500); | ||||
await c1.UnsubscribeAsync(topicFilter); | await c1.UnsubscribeAsync(topicFilter); | ||||
await Task.Delay(500); | await Task.Delay(500); | ||||
} | } | ||||
finally | finally | ||||