diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 963ee06..81e6caf 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -11,8 +11,15 @@
false
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.
+* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
+* [Client] Improve connection stability (thanks to @jltjohanlindqvist).
+* [ManagedClient] Fixed a memory leak (thanks to @zawodskoj).
+* [ManagedClient] Improved internal subscription management (#569, thanks to @cstichlberger).
+* [ManagedClient] Refactored log messages (thanks to @cstichlberger).
* [Server] Added support for assigned client IDs (MQTTv5 only) (thanks to @bcrosnier).
+* [Server] Added interceptor for unsubscriptions.
+* [MQTTnet.Server] Added interceptor for unsubscriptions.
Copyright Christian Kratky 2016-2019
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin
diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
index c9053e4..8b5a48f 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
@@ -16,15 +16,25 @@ using MQTTnet.Server;
namespace MQTTnet.Extensions.ManagedClient
{
- public class ManagedMqttClient : IManagedMqttClient
+ public class ManagedMqttClient : Disposable, IManagedMqttClient
{
private readonly BlockingQueue _messageQueue = new BlockingQueue();
+
+ ///
+ /// The subscriptions are managed in 2 separate buckets:
+ /// and are processed during normal operation
+ /// and are moved to the when they get processed. They can be accessed by
+ /// any thread and are therefore mutex'ed. get sent to the broker
+ /// at reconnect and are solely owned by .
+ ///
+ private readonly Dictionary _reconnectSubscriptions = new Dictionary();
private readonly Dictionary _subscriptions = new Dictionary();
private readonly HashSet _unsubscriptions = new HashSet();
+ private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
private readonly IMqttClient _mqttClient;
private readonly IMqttNetChildLogger _logger;
-
+
private readonly AsyncLock _messageQueueLock = new AsyncLock();
private CancellationTokenSource _connectionCancellationToken;
@@ -32,10 +42,7 @@ namespace MQTTnet.Extensions.ManagedClient
private Task _maintainConnectionTask;
private ManagedMqttClientStorageManager _storageManager;
-
- private bool _disposed;
- private bool _subscriptionsNotPushed;
-
+
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
@@ -82,10 +89,6 @@ namespace MQTTnet.Extensions.ManagedClient
if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
- if (!options.ClientOptions.CleanSession)
- {
- throw new NotSupportedException("The managed client does not support existing sessions.");
- }
if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
@@ -141,6 +144,7 @@ namespace MQTTnet.Extensions.ManagedClient
ThrowIfDisposed();
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
+ if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
@@ -169,7 +173,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
_messageQueue.Enqueue(applicationMessage);
-
+
if (_storageManager != null)
{
if (removedMessage != null)
@@ -206,9 +210,10 @@ namespace MQTTnet.Extensions.ManagedClient
foreach (var topicFilter in topicFilters)
{
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
- _subscriptionsNotPushed = true;
+ _unsubscriptions.Remove(topicFilter.Topic);
}
}
+ _subscriptionsQueuedSignal.Release();
return Task.FromResult(0);
}
@@ -223,45 +228,34 @@ namespace MQTTnet.Extensions.ManagedClient
{
foreach (var topic in topics)
{
- if (_subscriptions.Remove(topic))
- {
- _unsubscriptions.Add(topic);
- _subscriptionsNotPushed = true;
- }
+ _subscriptions.Remove(topic);
+ _unsubscriptions.Add(topic);
}
}
+ _subscriptionsQueuedSignal.Release();
return Task.FromResult(0);
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- if (_disposed)
- {
- return;
- }
-
- _disposed = true;
-
- StopPublishing();
- StopMaintainingConnection();
-
- if (_maintainConnectionTask != null)
+ if (disposing)
{
- Task.WaitAny(_maintainConnectionTask);
- _maintainConnectionTask = null;
- }
+ StopPublishing();
+ StopMaintainingConnection();
- _messageQueueLock.Dispose();
- _mqttClient.Dispose();
- }
+ if (_maintainConnectionTask != null)
+ {
+ _maintainConnectionTask.GetAwaiter().GetResult();
+ _maintainConnectionTask = null;
+ }
- private void ThrowIfDisposed()
- {
- if (_disposed)
- {
- throw new ObjectDisposedException(nameof(ManagedMqttClient));
+ _messageQueue.Dispose();
+ _messageQueueLock.Dispose();
+ _mqttClient.Dispose();
+ _subscriptionsQueuedSignal.Dispose();
}
+ base.Dispose(disposing);
}
private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
@@ -278,11 +272,11 @@ namespace MQTTnet.Extensions.ManagedClient
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while maintaining connection.");
+ _logger.Error(exception, "Error exception while maintaining connection.");
}
finally
{
- if (!_disposed)
+ if (!IsDisposed)
{
try
{
@@ -295,6 +289,12 @@ namespace MQTTnet.Extensions.ManagedClient
_logger.Info("Stopped");
}
+ _reconnectSubscriptions.Clear();
+ lock (_subscriptions)
+ {
+ _subscriptions.Clear();
+ _unsubscriptions.Clear();
+ }
}
}
@@ -310,16 +310,22 @@ namespace MQTTnet.Extensions.ManagedClient
return;
}
- if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
+ if (connectionState == ReconnectionResult.Reconnected)
+ {
+ await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
+ StartPublishing();
+ return;
+ }
+
+ if (connectionState == ReconnectionResult.Recovered)
{
- await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
StartPublishing();
return;
}
if (connectionState == ReconnectionResult.StillConnected)
{
- await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
+ await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -327,11 +333,11 @@ namespace MQTTnet.Extensions.ManagedClient
}
catch (MqttCommunicationException exception)
{
- _logger.Warning(exception, "Communication exception while maintaining connection.");
+ _logger.Warning(exception, "Communication error while maintaining connection.");
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while maintaining connection.");
+ _logger.Error(exception, "Error exception while maintaining connection.");
}
}
@@ -349,7 +355,7 @@ namespace MQTTnet.Extensions.ManagedClient
// of the messages, the DropOldestQueuedMessage strategy would
// be unable to know which message is actually the oldest and would
// instead drop the first item in the queue.
- var message = _messageQueue.PeekAndWait();
+ var message = _messageQueue.PeekAndWait(cancellationToken);
if (message == null)
{
continue;
@@ -389,7 +395,7 @@ namespace MQTTnet.Extensions.ManagedClient
// it from the queue. If not, that means this.PublishAsync has already
// removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
-
+
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
@@ -414,7 +420,7 @@ namespace MQTTnet.Extensions.ManagedClient
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
{
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
-
+
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
@@ -438,50 +444,84 @@ namespace MQTTnet.Extensions.ManagedClient
}
}
- private async Task SynchronizeSubscriptionsAsync()
+ private async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
- _logger.Info("Synchronizing subscriptions");
+ var endTime = DateTime.UtcNow + timeout;
+ while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
+ {
+ List subscriptions;
+ HashSet unsubscriptions;
- List subscriptions;
- HashSet unsubscriptions;
+ lock (_subscriptions)
+ {
+ subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
+ _subscriptions.Clear();
+ unsubscriptions = new HashSet(_unsubscriptions);
+ _unsubscriptions.Clear();
+ }
- lock (_subscriptions)
- {
- subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
+ if (!subscriptions.Any() && !unsubscriptions.Any())
+ {
+ continue;
+ }
- unsubscriptions = new HashSet(_unsubscriptions);
- _unsubscriptions.Clear();
+ _logger.Verbose($"Publishing subscriptions ({subscriptions.Count} subscriptions and {unsubscriptions.Count} unsubscriptions)");
- _subscriptionsNotPushed = false;
- }
+ foreach (var unsubscription in unsubscriptions)
+ {
+ _reconnectSubscriptions.Remove(unsubscription);
+ }
- if (!subscriptions.Any() && !unsubscriptions.Any())
- {
- return;
- }
+ foreach (var subscription in subscriptions)
+ {
+ _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
+ }
- try
- {
- if (unsubscriptions.Any())
+ try
+ {
+ if (unsubscriptions.Any())
+ {
+ await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
+ }
+
+ if (subscriptions.Any())
+ {
+ await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
+ }
+ }
+ catch (Exception exception)
{
- await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
+ await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
}
+ }
+ }
+
+ private async Task PublishReconnectSubscriptionsAsync()
+ {
+ _logger.Info("Publishing subscriptions at reconnect");
- if (subscriptions.Any())
+ try
+ {
+ if (_reconnectSubscriptions.Any())
{
+ var subscriptions = _reconnectSubscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
}
}
catch (Exception exception)
{
- _logger.Warning(exception, "Synchronizing subscriptions failed.");
- _subscriptionsNotPushed = true;
+ await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
+ }
+ }
- var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
- if (SynchronizingSubscriptionsFailedHandler != null)
- {
- await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
- }
+ private async Task HandleSubscriptionExceptionAsync(Exception exception)
+ {
+ _logger.Warning(exception, "Synchronizing subscriptions failed.");
+
+ var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
+ if (SynchronizingSubscriptionsFailedHandler != null)
+ {
+ await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
}
}
@@ -494,8 +534,8 @@ namespace MQTTnet.Extensions.ManagedClient
try
{
- await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
- return ReconnectionResult.Reconnected;
+ var result = await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
+ return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
}
catch (Exception exception)
{
@@ -508,7 +548,7 @@ namespace MQTTnet.Extensions.ManagedClient
return ReconnectionResult.NotConnected;
}
}
-
+
private void StartPublishing()
{
if (_publishingCancellationToken != null)
@@ -535,5 +575,11 @@ namespace MQTTnet.Extensions.ManagedClient
_connectionCancellationToken?.Dispose();
_connectionCancellationToken = null;
}
+
+ private TimeSpan GetRemainingTime(DateTime endTime)
+ {
+ var remainingTime = endTime - DateTime.UtcNow;
+ return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
+ }
}
}
diff --git a/Source/MQTTnet.Extensions.ManagedClient/ReconnectionResult.cs b/Source/MQTTnet.Extensions.ManagedClient/ReconnectionResult.cs
index fa876c3..092662f 100644
--- a/Source/MQTTnet.Extensions.ManagedClient/ReconnectionResult.cs
+++ b/Source/MQTTnet.Extensions.ManagedClient/ReconnectionResult.cs
@@ -4,6 +4,7 @@
{
StillConnected,
Reconnected,
+ Recovered,
NotConnected
}
}
diff --git a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs
index 68b12ef..47d9682 100644
--- a/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs
+++ b/Source/MQTTnet.Extensions.WebSocket4Net/WebSocket4NetMqttChannel.cs
@@ -85,7 +85,12 @@ namespace MQTTnet.Extensions.WebSocket4Net
{
foreach (var certificate in _webSocketOptions.TlsOptions.Certificates)
{
+#if WINDOWS_UWP
certificates.Add(new X509Certificate(certificate));
+#else
+ certificates.Add(certificate);
+#endif
+
}
}
diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
index b8c463f..85c4176 100644
--- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
+++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs
@@ -33,6 +33,7 @@ namespace MQTTnet.Server.Mqtt
private readonly MqttServerConnectionValidator _mqttConnectionValidator;
private readonly IMqttServer _mqttServer;
private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
+ private readonly MqttUnsubscriptionInterceptor _mqttUnsubscriptionInterceptor;
private readonly PythonScriptHostService _pythonScriptHostService;
private readonly MqttWebSocketServerAdapter _webSocketServerAdapter;
@@ -45,6 +46,7 @@ namespace MQTTnet.Server.Mqtt
MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
MqttServerConnectionValidator mqttConnectionValidator,
MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
+ MqttUnsubscriptionInterceptor mqttUnsubscriptionInterceptor,
MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
MqttServerStorage mqttServerStorage,
PythonScriptHostService pythonScriptHostService,
@@ -57,6 +59,7 @@ namespace MQTTnet.Server.Mqtt
_mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
_mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
_mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
+ _mqttUnsubscriptionInterceptor = mqttUnsubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttUnsubscriptionInterceptor));
_mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
_mqttServerStorage = mqttServerStorage ?? throw new ArgumentNullException(nameof(mqttServerStorage));
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
@@ -178,6 +181,7 @@ namespace MQTTnet.Server.Mqtt
.WithConnectionValidator(_mqttConnectionValidator)
.WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
.WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
+ .WithUnsubscriptionInterceptor(_mqttUnsubscriptionInterceptor)
.WithStorage(_mqttServerStorage);
// Configure unencrypted connections
diff --git a/Source/MQTTnet.Server/Mqtt/MqttUnsubscriptionInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttUnsubscriptionInterceptor.cs
new file mode 100644
index 0000000..1a460af
--- /dev/null
+++ b/Source/MQTTnet.Server/Mqtt/MqttUnsubscriptionInterceptor.cs
@@ -0,0 +1,48 @@
+using System;
+using System.Threading.Tasks;
+using IronPython.Runtime;
+using Microsoft.Extensions.Logging;
+using MQTTnet.Server.Scripting;
+
+namespace MQTTnet.Server.Mqtt
+{
+ public class MqttUnsubscriptionInterceptor : IMqttServerUnsubscriptionInterceptor
+ {
+ private readonly PythonScriptHostService _pythonScriptHostService;
+ private readonly ILogger _logger;
+
+ public MqttUnsubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger logger)
+ {
+ _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public Task InterceptUnsubscriptionAsync(MqttUnsubscriptionInterceptorContext context)
+ {
+ try
+ {
+ var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey];
+
+ var pythonContext = new PythonDictionary
+ {
+ { "client_id", context.ClientId },
+ { "session_items", sessionItems },
+ { "topic", context.Topic },
+ { "accept_unsubscription", context.AcceptUnsubscription },
+ { "close_connection", context.CloseConnection }
+ };
+
+ _pythonScriptHostService.InvokeOptionalFunction("on_intercept_unsubscription", pythonContext);
+
+ context.AcceptUnsubscription = (bool)pythonContext["accept_unsubscription"];
+ context.CloseConnection = (bool)pythonContext["close_connection"];
+ }
+ catch (Exception exception)
+ {
+ _logger.LogError(exception, "Error while intercepting unsubscription.");
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
index 4a0a85d..f364168 100644
--- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
+++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs
@@ -14,7 +14,7 @@ using MQTTnet.Packets;
namespace MQTTnet.Adapter
{
- public class MqttChannelAdapter : IMqttChannelAdapter
+ public class MqttChannelAdapter : Disposable, IMqttChannelAdapter
{
private const uint ErrorOperationAborted = 0x800703E3;
private const int ReadBufferSize = 4096; // TODO: Move buffer size to config
@@ -26,9 +26,7 @@ namespace MQTTnet.Adapter
private readonly MqttPacketReader _packetReader;
private readonly byte[] _fixedHeaderBuffer = new byte[2];
-
- private bool _isDisposed;
-
+
private long _bytesReceived;
private long _bytesSent;
@@ -269,19 +267,13 @@ namespace MQTTnet.Adapter
}
}
- public void Dispose()
- {
- _isDisposed = true;
-
- _channel?.Dispose();
- }
-
- private void ThrowIfDisposed()
+ protected override void Dispose(bool disposing)
{
- if (_isDisposed)
+ if (disposing)
{
- throw new ObjectDisposedException(nameof(MqttChannelAdapter));
+ _channel?.Dispose();
}
+ base.Dispose(disposing);
}
private static bool IsWrappedException(Exception exception)
diff --git a/Source/MQTTnet/Adapter/MqttConnectingFailedException.cs b/Source/MQTTnet/Adapter/MqttConnectingFailedException.cs
index 44d50ec..ab49d93 100644
--- a/Source/MQTTnet/Adapter/MqttConnectingFailedException.cs
+++ b/Source/MQTTnet/Adapter/MqttConnectingFailedException.cs
@@ -5,12 +5,13 @@ namespace MQTTnet.Adapter
{
public class MqttConnectingFailedException : MqttCommunicationException
{
- public MqttConnectingFailedException(MqttClientConnectResultCode resultCode)
- : base($"Connecting with MQTT server failed ({resultCode.ToString()}).")
+ public MqttConnectingFailedException(MqttClientAuthenticateResult result)
+ : base($"Connecting with MQTT server failed ({result.ResultCode.ToString()}).")
{
- ResultCode = resultCode;
+ Result = result;
}
- public MqttClientConnectResultCode ResultCode { get; }
+ public MqttClientAuthenticateResult Result { get; }
+ public MqttClientConnectResultCode ResultCode => Result.ResultCode;
}
}
diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs
index 27b56ff..29687fc 100644
--- a/Source/MQTTnet/Client/MqttClient.cs
+++ b/Source/MQTTnet/Client/MqttClient.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -20,11 +20,12 @@ using MQTTnet.Protocol;
namespace MQTTnet.Client
{
- public class MqttClient : IMqttClient
+ public class MqttClient : Disposable, IMqttClient
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly Stopwatch _sendTracker = new Stopwatch();
+ private readonly Stopwatch _receiveTracker = new Stopwatch();
private readonly object _disconnectLock = new object();
private readonly IMqttClientAdapterFactory _adapterFactory;
@@ -63,6 +64,8 @@ namespace MQTTnet.Client
ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
+ ThrowIfDisposed();
+
MqttClientAuthenticateResult authenticateResult = null;
try
@@ -79,15 +82,19 @@ namespace MQTTnet.Client
var adapter = _adapterFactory.CreateClientAdapter(options, _logger);
_adapter = adapter;
- _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
- await _adapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- _logger.Verbose("Connection with server established.");
+ using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken))
+ {
+ _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
+ await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false);
+ _logger.Verbose("Connection with server established.");
- _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);
+ _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);
- authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, cancellationToken).ConfigureAwait(false);
+ authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false);
+ }
_sendTracker.Restart();
+ _receiveTracker.Restart();
if (Options.KeepAlivePeriod != TimeSpan.Zero)
{
@@ -149,7 +156,7 @@ namespace MQTTnet.Client
Properties = new MqttAuthPacketProperties
{
// This must always be equal to the value from the CONNECT packet. So we use it here to ensure that.
- AuthenticationMethod = Options.AuthenticationMethod,
+ AuthenticationMethod = Options.AuthenticationMethod,
AuthenticationData = data.AuthenticationData,
ReasonString = data.ReasonString,
UserProperties = data.UserProperties
@@ -161,6 +168,7 @@ namespace MQTTnet.Client
{
if (options == null) throw new ArgumentNullException(nameof(options));
+ ThrowIfDisposed();
ThrowIfNotConnected();
var subscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateSubscribePacket(options);
@@ -174,6 +182,7 @@ namespace MQTTnet.Client
{
if (options == null) throw new ArgumentNullException(nameof(options));
+ ThrowIfDisposed();
ThrowIfNotConnected();
var unsubscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateUnsubscribePacket(options);
@@ -189,6 +198,7 @@ namespace MQTTnet.Client
MqttTopicValidator.ThrowIfInvalid(applicationMessage.Topic);
+ ThrowIfDisposed();
ThrowIfNotConnected();
var publishPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(applicationMessage);
@@ -214,7 +224,7 @@ namespace MQTTnet.Client
}
}
- public void Dispose()
+ private void Cleanup()
{
_backgroundCancellationTokenSource?.Cancel(false);
_backgroundCancellationTokenSource?.Dispose();
@@ -224,6 +234,18 @@ namespace MQTTnet.Client
_adapter = null;
}
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ Cleanup();
+
+ DisconnectedHandler = null;
+ }
+ base.Dispose(disposing);
+ }
+
private async Task AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
{
var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket(
@@ -235,7 +257,7 @@ namespace MQTTnet.Client
if (result.ResultCode != MqttClientConnectResultCode.Success)
{
- throw new MqttConnectingFailedException(result.ResultCode);
+ throw new MqttConnectingFailedException(result);
}
_logger.Verbose("Authenticated MQTT connection with server established.");
@@ -258,29 +280,37 @@ namespace MQTTnet.Client
var clientWasConnected = IsConnected;
TryInitiateDisconnect();
+ IsConnected = false;
try
{
- IsConnected = false;
-
if (_adapter != null)
{
_logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout);
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
- await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
- await WaitForTaskAsync(_keepAlivePacketsSenderTask, sender).ConfigureAwait(false);
-
_logger.Verbose("Disconnected from adapter.");
}
catch (Exception adapterException)
{
_logger.Warning(adapterException, "Error while disconnecting from adapter.");
}
+
+ try
+ {
+ var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
+ var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);
+
+ await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ _logger.Warning(e, "Error while waiting for internal tasks.");
+ }
finally
{
- Dispose();
+ Cleanup();
_cleanDisconnectInitiated = false;
_logger.Info("Disconnected.");
@@ -344,11 +374,26 @@ namespace MQTTnet.Client
try
{
await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- return await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
}
- catch (MqttCommunicationTimedOutException)
+ catch (Exception e)
+ {
+ _logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name);
+ packetAwaiter.Cancel();
+ }
+
+ try
{
- _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
+ var response = await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
+ _receiveTracker.Restart();
+ return response;
+ }
+ catch (Exception exception)
+ {
+ if (exception is MqttCommunicationTimedOutException)
+ {
+ _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
+ }
+
throw;
}
}
@@ -369,14 +414,14 @@ namespace MQTTnet.Client
keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}
- var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
- if (waitTime <= TimeSpan.Zero)
+ var waitTimeSend = keepAliveSendInterval - _sendTracker.Elapsed;
+ var waitTimeReceive = keepAliveSendInterval - _receiveTracker.Elapsed;
+ if (waitTimeSend <= TimeSpan.Zero || waitTimeReceive <= TimeSpan.Zero)
{
await SendAndReceiveAsync(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
- waitTime = keepAliveSendInterval;
}
- await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
+ await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
@@ -391,11 +436,11 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets.");
+ _logger.Warning(exception, "Communication error while sending/receiving keep alive packets.");
}
else
{
- _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
+ _logger.Error(exception, "Error exception while sending/receiving keep alive packets.");
}
if (!DisconnectIsPending())
@@ -449,11 +494,11 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "MQTT communication exception while receiving packets.");
+ _logger.Warning(exception, "Communication error while receiving packets.");
}
else
{
- _logger.Error(exception, "Unhandled exception while receiving packets.");
+ _logger.Error(exception, "Error while receiving packets.");
}
_packetDispatcher.Dispatch(exception);
@@ -473,6 +518,8 @@ namespace MQTTnet.Client
{
try
{
+ _receiveTracker.Restart();
+
if (packet is MqttPublishPacket publishPacket)
{
await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
@@ -521,11 +568,11 @@ namespace MQTTnet.Client
}
else if (exception is MqttCommunicationException)
{
- _logger.Warning(exception, "MQTT communication exception while receiving packets.");
+ _logger.Warning(exception, "Communication error while receiving packets.");
}
else
{
- _logger.Error(exception, "Unhandled exception while receiving packets.");
+ _logger.Error(exception, "Error while receiving packets.");
}
_packetDispatcher.Dispatch(exception);
@@ -567,7 +614,7 @@ namespace MQTTnet.Client
};
await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
- }
+ }
}
else
{
@@ -576,7 +623,7 @@ namespace MQTTnet.Client
}
catch (Exception exception)
{
- _logger.Error(exception, "Unhandled exception while handling application message.");
+ _logger.Error(exception, "Error while handling application message.");
}
}
@@ -626,15 +673,25 @@ namespace MQTTnet.Client
return true;
}
- private static async Task WaitForTaskAsync(Task task, Task sender)
+ private async Task WaitForTaskAsync(Task task, Task sender)
{
- if (task == sender || task == null)
+ if (task == null)
{
return;
}
- if (task.IsCanceled || task.IsCompleted || task.IsFaulted)
+ if (task == sender)
{
+ // Return here to avoid deadlocks, but first any eventual exception in the task
+ // must be handled to avoid not getting an unhandled task exception
+ if (!task.IsFaulted)
+ {
+ return;
+ }
+
+ // By accessing the Exception property the exception is considered handled and will
+ // not result in an unhandled task exception later by the finalizer
+ _logger.Warning(task.Exception, "Error while waiting for background task.");
return;
}
@@ -652,4 +709,4 @@ namespace MQTTnet.Client
return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0;
}
}
-}
\ No newline at end of file
+}
diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
index 65a1ec9..4fd0ccf 100644
--- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
+++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
@@ -256,7 +256,11 @@ namespace MQTTnet.Client.Options
UseTls = true,
SslProtocol = _tlsParameters.SslProtocol,
AllowUntrustedCertificates = _tlsParameters.AllowUntrustedCertificates,
+#if WINDOWS_UWP
Certificates = _tlsParameters.Certificates?.Select(c => c.ToArray()).ToList(),
+#else
+ Certificates = _tlsParameters.Certificates?.ToList(),
+#endif
CertificateValidationCallback = _tlsParameters.CertificateValidationCallback,
IgnoreCertificateChainErrors = _tlsParameters.IgnoreCertificateChainErrors,
IgnoreCertificateRevocationErrors = _tlsParameters.IgnoreCertificateRevocationErrors
diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilderTlsParameters.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilderTlsParameters.cs
index ea36baa..d1854ff 100644
--- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilderTlsParameters.cs
+++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilderTlsParameters.cs
@@ -18,7 +18,12 @@ namespace MQTTnet.Client.Options
public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12;
+#if WINDOWS_UWP
public IEnumerable> Certificates { get; set; }
+#else
+ public IEnumerable Certificates { get; set; }
+#endif
+
public bool AllowUntrustedCertificates { get; set; }
diff --git a/Source/MQTTnet/Client/Options/MqttClientTlsOptions.cs b/Source/MQTTnet/Client/Options/MqttClientTlsOptions.cs
index db4077d..0d1a3a5 100644
--- a/Source/MQTTnet/Client/Options/MqttClientTlsOptions.cs
+++ b/Source/MQTTnet/Client/Options/MqttClientTlsOptions.cs
@@ -15,8 +15,11 @@ namespace MQTTnet.Client.Options
public bool IgnoreCertificateChainErrors { get; set; }
public bool AllowUntrustedCertificates { get; set; }
-
+#if WINDOWS_UWP
public List Certificates { get; set; }
+#else
+ public List Certificates { get; set; }
+#endif
public SslProtocols SslProtocol { get; set; } = SslProtocols.Tls12;
diff --git a/Source/MQTTnet/Client/Unsubscribing/MqttClientUnsubscribeOptionsBuilder.cs b/Source/MQTTnet/Client/Unsubscribing/MqttClientUnsubscribeOptionsBuilder.cs
new file mode 100644
index 0000000..96c178f
--- /dev/null
+++ b/Source/MQTTnet/Client/Unsubscribing/MqttClientUnsubscribeOptionsBuilder.cs
@@ -0,0 +1,60 @@
+using MQTTnet.Packets;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace MQTTnet.Client.Unsubscribing
+{
+ public class MqttClientUnsubscribeOptionsBuilder
+ {
+ private readonly MqttClientUnsubscribeOptions _unsubscribeOptions = new MqttClientUnsubscribeOptions();
+
+ public MqttClientUnsubscribeOptionsBuilder WithUserProperty(string name, string value)
+ {
+ if (name is null) throw new ArgumentNullException(nameof(name));
+ if (value is null) throw new ArgumentNullException(nameof(value));
+
+ return WithUserProperty(new MqttUserProperty(name, value));
+ }
+
+ public MqttClientUnsubscribeOptionsBuilder WithUserProperty(MqttUserProperty userProperty)
+ {
+ if (userProperty is null) throw new ArgumentNullException(nameof(userProperty));
+
+ if (_unsubscribeOptions.UserProperties is null)
+ {
+ _unsubscribeOptions.UserProperties = new List();
+ }
+
+ _unsubscribeOptions.UserProperties.Add(userProperty);
+
+ return this;
+ }
+
+ public MqttClientUnsubscribeOptionsBuilder WithTopicFilter(string topic)
+ {
+ if (topic is null) throw new ArgumentNullException(nameof(topic));
+
+ if (_unsubscribeOptions.TopicFilters is null)
+ {
+ _unsubscribeOptions.TopicFilters = new List();
+ }
+
+ _unsubscribeOptions.TopicFilters.Add(topic);
+
+ return this;
+ }
+
+ public MqttClientUnsubscribeOptionsBuilder WithTopicFilter(TopicFilter topicFilter)
+ {
+ if (topicFilter is null) throw new ArgumentNullException(nameof(topicFilter));
+
+ return WithTopic(topicFilter.Topic);
+ }
+
+ public MqttClientUnsubscribeOptions Build()
+ {
+ return _unsubscribeOptions;
+ }
+ }
+}
diff --git a/Source/MQTTnet/Exceptions/MqttConfigurationException.cs b/Source/MQTTnet/Exceptions/MqttConfigurationException.cs
new file mode 100644
index 0000000..4d10faf
--- /dev/null
+++ b/Source/MQTTnet/Exceptions/MqttConfigurationException.cs
@@ -0,0 +1,21 @@
+using System;
+
+namespace MQTTnet.Exceptions
+{
+ public class MqttConfigurationException : Exception
+ {
+ protected MqttConfigurationException()
+ {
+ }
+
+ public MqttConfigurationException(Exception innerException)
+ : base(innerException.Message, innerException)
+ {
+ }
+
+ public MqttConfigurationException(string message)
+ : base(message)
+ {
+ }
+ }
+}
diff --git a/Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs b/Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
index 42d3241..886b937 100644
--- a/Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
+++ b/Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
@@ -139,7 +139,7 @@ namespace MQTTnet.Formatter.V5
ReasonCode = connectionValidatorContext.ReasonCode,
Properties = new MqttConnAckPacketProperties
{
- UserProperties = connectionValidatorContext.UserProperties,
+ UserProperties = connectionValidatorContext.ResponseUserProperties,
AuthenticationMethod = connectionValidatorContext.AuthenticationMethod,
AuthenticationData = connectionValidatorContext.ResponseAuthenticationData,
AssignedClientIdentifier = connectionValidatorContext.AssignedClientIdentifier,
diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
index d7943ad..9b2ba56 100644
--- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs
@@ -10,10 +10,11 @@ using System.Runtime.ExceptionServices;
using System.Threading;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
+using MQTTnet.Internal;
namespace MQTTnet.Implementations
{
- public class MqttTcpChannel : IMqttChannel
+ public class MqttTcpChannel : Disposable, IMqttChannel
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
@@ -72,11 +73,7 @@ namespace MQTTnet.Implementations
// Workaround for: workaround for https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => socket.Dispose()))
{
-#if NET452 || NET461
- await Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, _options.Server, _options.GetPort(), null).ConfigureAwait(false);
-#else
- await socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false);
-#endif
+ await PlatformAbstractionLayer.ConnectAsync(socket, _options.Server, _options.GetPort()).ConfigureAwait(false);
}
var networkStream = new NetworkStream(socket, true);
@@ -98,7 +95,7 @@ namespace MQTTnet.Implementations
public Task DisconnectAsync(CancellationToken cancellationToken)
{
- Dispose();
+ Cleanup();
return Task.FromResult(0);
}
@@ -117,6 +114,10 @@ namespace MQTTnet.Implementations
return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
+ catch (ObjectDisposedException)
+ {
+ return 0;
+ }
catch (IOException exception)
{
if (exception.InnerException is SocketException socketException)
@@ -143,6 +144,10 @@ namespace MQTTnet.Implementations
await _stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
+ catch (ObjectDisposedException)
+ {
+ return;
+ }
catch (IOException exception)
{
if (exception.InnerException is SocketException socketException)
@@ -154,7 +159,7 @@ namespace MQTTnet.Implementations
}
}
- public void Dispose()
+ private void Cleanup()
{
// When the stream is disposed it will also close the socket and this will also dispose it.
// So there is no need to dispose the socket again.
@@ -173,6 +178,15 @@ namespace MQTTnet.Implementations
_stream = null;
}
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ Cleanup();
+ }
+ base.Dispose(disposing);
+ }
+
private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
if (_options.TlsOptions.CertificateValidationCallback != null)
@@ -214,7 +228,7 @@ namespace MQTTnet.Implementations
foreach (var certificate in _options.TlsOptions.Certificates)
{
- certificates.Add(new X509Certificate2(certificate));
+ certificates.Add(certificate);
}
return certificates;
diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
index d7f4e6f..501c4da 100644
--- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
@@ -8,11 +8,12 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
+using MQTTnet.Internal;
using MQTTnet.Server;
namespace MQTTnet.Implementations
{
- public class MqttTcpServerAdapter : IMqttServerAdapter
+ public class MqttTcpServerAdapter : Disposable, IMqttServerAdapter
{
private readonly List _listeners = new List();
private readonly IMqttNetChildLogger _logger;
@@ -72,11 +73,11 @@ namespace MQTTnet.Implementations
public Task StopAsync()
{
- Dispose();
+ Cleanup();
return Task.FromResult(0);
}
- public void Dispose()
+ private void Cleanup()
{
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
@@ -90,6 +91,15 @@ namespace MQTTnet.Implementations
_listeners.Clear();
}
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ Cleanup();
+ }
+ base.Dispose(disposing);
+ }
+
private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken)
{
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None))
diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs
index d57888e..f2f439e 100644
--- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs
+++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs
@@ -107,12 +107,7 @@ namespace MQTTnet.Implementations
{
try
{
-#if NET452 || NET461
- var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false);
-#else
- var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
-#endif
-
+ var clientSocket = await PlatformAbstractionLayer.AcceptAsync(_socket).ConfigureAwait(false);
if (clientSocket == null)
{
continue;
diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
index 38e4342..c159b91 100644
--- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
+++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
@@ -6,10 +6,11 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
+using MQTTnet.Internal;
namespace MQTTnet.Implementations
{
- public class MqttWebSocketChannel : IMqttChannel
+ public class MqttWebSocketChannel : Disposable, IMqttChannel
{
private readonly MqttClientWebSocketOptions _options;
@@ -84,7 +85,12 @@ namespace MQTTnet.Implementations
clientWebSocket.Options.ClientCertificates = new X509CertificateCollection();
foreach (var certificate in _options.TlsOptions.Certificates)
{
+#if WINDOWS_UWP
clientWebSocket.Options.ClientCertificates.Add(new X509Certificate(certificate));
+#else
+ clientWebSocket.Options.ClientCertificates.Add(certificate);
+#endif
+
}
}
@@ -106,7 +112,7 @@ namespace MQTTnet.Implementations
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}
- Dispose();
+ Cleanup();
}
public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -136,7 +142,16 @@ namespace MQTTnet.Implementations
}
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ Cleanup();
+ }
+ base.Dispose(disposing);
+ }
+
+ private void Cleanup()
{
_sendLock?.Dispose();
_sendLock = null;
diff --git a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
new file mode 100644
index 0000000..ee9057a
--- /dev/null
+++ b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
@@ -0,0 +1,104 @@
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Implementations
+{
+ public static class PlatformAbstractionLayer
+ {
+ public static async Task AcceptAsync(Socket socket)
+ {
+#if NET452 || NET461
+ try
+ {
+ return await Task.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null).ConfigureAwait(false);
+ }
+ catch (ObjectDisposedException)
+ {
+ return null;
+ }
+#else
+ return await socket.AcceptAsync().ConfigureAwait(false);
+#endif
+ }
+
+
+ public static Task ConnectAsync(Socket socket, IPAddress ip, int port)
+ {
+#if NET452 || NET461
+ return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ip, port, null);
+#else
+ return socket.ConnectAsync(ip, port);
+#endif
+ }
+
+ public static Task ConnectAsync(Socket socket, string host, int port)
+ {
+#if NET452 || NET461
+ return Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, host, port, null);
+#else
+ return socket.ConnectAsync(host, port);
+#endif
+ }
+
+#if NET452 || NET461
+ public class SocketWrapper
+ {
+ private readonly Socket _socket;
+ private readonly ArraySegment _buffer;
+ private readonly SocketFlags _socketFlags;
+
+ public SocketWrapper(Socket socket, ArraySegment buffer, SocketFlags socketFlags)
+ {
+ _socket = socket;
+ _buffer = buffer;
+ _socketFlags = socketFlags;
+ }
+
+ public static IAsyncResult BeginSend(AsyncCallback callback, object state)
+ {
+ var real = (SocketWrapper)state;
+ return real._socket.BeginSend(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
+ }
+
+ public static IAsyncResult BeginReceive(AsyncCallback callback, object state)
+ {
+ var real = (SocketWrapper)state;
+ return real._socket.BeginReceive(real._buffer.Array, real._buffer.Offset, real._buffer.Count, real._socketFlags, callback, state);
+ }
+ }
+#endif
+
+ public static Task SendAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags)
+ {
+#if NET452 || NET461
+ return Task.Factory.FromAsync(SocketWrapper.BeginSend, socket.EndSend, new SocketWrapper(socket, buffer, socketFlags));
+#else
+ return socket.SendAsync(buffer, socketFlags);
+#endif
+ }
+
+ public static Task ReceiveAsync(Socket socket, ArraySegment buffer, SocketFlags socketFlags)
+ {
+#if NET452 || NET461
+ return Task.Factory.FromAsync(SocketWrapper.BeginReceive, socket.EndReceive, new SocketWrapper(socket, buffer, socketFlags));
+#else
+ return socket.ReceiveAsync(buffer, socketFlags);
+#endif
+ }
+
+ public static Task CompletedTask
+ {
+ get
+ {
+#if NET452
+ return Task.FromResult(0);
+#else
+ return Task.CompletedTask;
+#endif
+ }
+ }
+
+ }
+}
diff --git a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs b/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs
deleted file mode 100644
index cd62f07..0000000
--- a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs
+++ /dev/null
@@ -1,131 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace MQTTnet.Internal
-{
- // Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490)
- public class AsyncAutoResetEvent
- {
- private readonly LinkedList> _waiters = new LinkedList>();
-
- private bool _isSignaled;
-
- public AsyncAutoResetEvent()
- : this(false)
- {
- }
-
- public AsyncAutoResetEvent(bool signaled)
- {
- _isSignaled = signaled;
- }
-
- public int WaitersCount
- {
- get
- {
- lock (_waiters)
- {
- return _waiters.Count;
- }
- }
- }
-
- public Task WaitOneAsync()
- {
- return WaitOneAsync(CancellationToken.None);
- }
-
- public Task WaitOneAsync(TimeSpan timeout)
- {
- return WaitOneAsync(timeout, CancellationToken.None);
- }
-
- public Task WaitOneAsync(CancellationToken cancellationToken)
- {
- return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken);
- }
-
- public async Task WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- TaskCompletionSource tcs;
-
- lock (_waiters)
- {
- if (_isSignaled)
- {
- _isSignaled = false;
- return true;
- }
-
- if (timeout == TimeSpan.Zero)
- {
- return _isSignaled;
- }
-
- tcs = new TaskCompletionSource();
- _waiters.AddLast(tcs);
- }
-
- Task winner;
- if (timeout == Timeout.InfiniteTimeSpan)
- {
- using (cancellationToken.Register(() => { tcs.TrySetCanceled(); }))
- {
- await tcs.Task.ConfigureAwait(false);
- winner = tcs.Task;
- }
- }
- else
- {
- winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false);
- }
-
- var taskWasSignaled = winner == tcs.Task;
- if (taskWasSignaled)
- {
- return true;
- }
-
- // We timed-out; remove our reference to the task.
- // This is an O(n) operation since waiters is a LinkedList.
- lock (_waiters)
- {
- _waiters.Remove(tcs);
-
- if (winner.Status == TaskStatus.Canceled)
- {
- throw new OperationCanceledException(cancellationToken);
- }
-
- throw new TimeoutException();
- }
- }
-
- public void Set()
- {
- TaskCompletionSource toRelease = null;
-
- lock (_waiters)
- {
- if (_waiters.Count > 0)
- {
- // Signal the first task in the waiters list.
- toRelease = _waiters.First.Value;
- _waiters.RemoveFirst();
- }
- else if (!_isSignaled)
- {
- // No tasks are pending
- _isSignaled = true;
- }
- }
-
- toRelease?.TrySetResult(true);
- }
- }
-}
diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs
index 485f644..2fa21be 100644
--- a/Source/MQTTnet/Internal/BlockingQueue.cs
+++ b/Source/MQTTnet/Internal/BlockingQueue.cs
@@ -4,11 +4,11 @@ using System.Threading;
namespace MQTTnet.Internal
{
- public class BlockingQueue
+ public class BlockingQueue : Disposable
{
private readonly object _syncRoot = new object();
private readonly LinkedList _items = new LinkedList();
- private readonly ManualResetEvent _gate = new ManualResetEvent(false);
+ private readonly ManualResetEventSlim _gate = new ManualResetEventSlim(false);
public int Count
{
@@ -32,7 +32,7 @@ namespace MQTTnet.Internal
}
}
- public TItem Dequeue()
+ public TItem Dequeue(CancellationToken cancellationToken = default(CancellationToken))
{
while (true)
{
@@ -52,11 +52,11 @@ namespace MQTTnet.Internal
}
}
- _gate.WaitOne();
+ _gate.Wait(cancellationToken);
}
}
- public TItem PeekAndWait()
+ public TItem PeekAndWait(CancellationToken cancellationToken = default(CancellationToken))
{
while (true)
{
@@ -73,7 +73,7 @@ namespace MQTTnet.Internal
}
}
- _gate.WaitOne();
+ _gate.Wait(cancellationToken);
}
}
@@ -108,5 +108,14 @@ namespace MQTTnet.Internal
_items.Clear();
}
}
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _gate.Dispose();
+ }
+ base.Dispose(disposing);
+ }
}
}
diff --git a/Source/MQTTnet/Internal/Disposable.cs b/Source/MQTTnet/Internal/Disposable.cs
new file mode 100644
index 0000000..2ce3423
--- /dev/null
+++ b/Source/MQTTnet/Internal/Disposable.cs
@@ -0,0 +1,57 @@
+using System;
+
+namespace MQTTnet.Internal
+{
+ public class Disposable : IDisposable
+ {
+ protected bool IsDisposed => _isDisposed;
+
+ protected void ThrowIfDisposed()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(GetType().Name);
+ }
+ }
+
+
+ #region IDisposable Support
+
+ private bool _isDisposed = false; // To detect redundant calls
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ // TODO: dispose managed state (managed objects).
+ }
+
+ // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
+ // TODO: set large fields to null.
+ }
+
+ // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
+ // ~Disposable()
+ // {
+ // // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
+ // Dispose(false);
+ // }
+
+ // This code added to correctly implement the disposable pattern.
+ public void Dispose()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ _isDisposed = true;
+
+ // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
+ Dispose(true);
+ // TODO: uncomment the following line if the finalizer is overridden above.
+ // GC.SuppressFinalize(this);
+ }
+ #endregion
+ }
+}
diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj
index 61ca517..28e3f3d 100644
--- a/Source/MQTTnet/MQTTnet.csproj
+++ b/Source/MQTTnet/MQTTnet.csproj
@@ -62,5 +62,9 @@
+
+
+
+
\ No newline at end of file
diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
index 19df6d4..b172290 100644
--- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
+++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
@@ -2,13 +2,14 @@
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Exceptions;
+using MQTTnet.Internal;
using MQTTnet.Packets;
namespace MQTTnet.PacketDispatcher
{
- public sealed class MqttPacketAwaiter : IMqttPacketAwaiter where TPacket : MqttBasePacket
+ public sealed class MqttPacketAwaiter : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket
{
- private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource();
+ private readonly TaskCompletionSource _taskCompletionSource;
private readonly ushort? _packetIdentifier;
private readonly MqttPacketDispatcher _owningPacketDispatcher;
@@ -16,13 +17,18 @@ namespace MQTTnet.PacketDispatcher
{
_packetIdentifier = packetIdentifier;
_owningPacketDispatcher = owningPacketDispatcher ?? throw new ArgumentNullException(nameof(owningPacketDispatcher));
+#if NET452
+ _taskCompletionSource = new TaskCompletionSource();
+#else
+ _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+#endif
}
public async Task WaitOneAsync(TimeSpan timeout)
{
using (var timeoutToken = new CancellationTokenSource(timeout))
{
- timeoutToken.Token.Register(() => _taskCompletionSource.TrySetException(new MqttCommunicationTimedOutException()));
+ timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException()));
var packet = await _taskCompletionSource.Task.ConfigureAwait(false);
return (TPacket)packet;
@@ -32,29 +38,56 @@ namespace MQTTnet.PacketDispatcher
public void Complete(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
-
+
+
+#if NET452
// To prevent deadlocks it is required to call the _TrySetResult_ method
// from a new thread because the awaiting code will not(!) be executed in
// a new thread automatically (due to await). Furthermore _this_ thread will
// do it. But _this_ thread is also reading incoming packets -> deadlock.
+ // NET452 does not support RunContinuationsAsynchronously
Task.Run(() => _taskCompletionSource.TrySetResult(packet));
+#else
+ _taskCompletionSource.TrySetResult(packet);
+#endif
}
public void Fail(Exception exception)
{
if (exception == null) throw new ArgumentNullException(nameof(exception));
-
+#if NET452
+ // To prevent deadlocks it is required to call the _TrySetResult_ method
+ // from a new thread because the awaiting code will not(!) be executed in
+ // a new thread automatically (due to await). Furthermore _this_ thread will
+ // do it. But _this_ thread is also reading incoming packets -> deadlock.
+ // NET452 does not support RunContinuationsAsynchronously
Task.Run(() => _taskCompletionSource.TrySetException(exception));
+#else
+ _taskCompletionSource.TrySetException(exception);
+#endif
}
public void Cancel()
{
+#if NET452
+ // To prevent deadlocks it is required to call the _TrySetResult_ method
+ // from a new thread because the awaiting code will not(!) be executed in
+ // a new thread automatically (due to await). Furthermore _this_ thread will
+ // do it. But _this_ thread is also reading incoming packets -> deadlock.
+ // NET452 does not support RunContinuationsAsynchronously
Task.Run(() => _taskCompletionSource.TrySetCanceled());
+#else
+ _taskCompletionSource.TrySetCanceled();
+#endif
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier);
+ if (disposing)
+ {
+ _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier);
+ }
+ base.Dispose(disposing);
}
}
}
\ No newline at end of file
diff --git a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
new file mode 100644
index 0000000..6ffdd2b
--- /dev/null
+++ b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using MQTTnet.Diagnostics;
+
+namespace MQTTnet.Server
+{
+ public interface IMqttRetainedMessagesManager
+ {
+ Task Start(IMqttServerOptions options, IMqttNetChildLogger logger);
+
+ Task LoadMessagesAsync();
+
+ Task ClearMessagesAsync();
+
+ Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);
+
+ Task> GetMessagesAsync();
+
+ Task> GetSubscribedMessagesAsync(ICollection topicFilters);
+ }
+}
diff --git a/Source/MQTTnet/Server/IMqttServerOptions.cs b/Source/MQTTnet/Server/IMqttServerOptions.cs
index 7c5fde4..7df6f54 100644
--- a/Source/MQTTnet/Server/IMqttServerOptions.cs
+++ b/Source/MQTTnet/Server/IMqttServerOptions.cs
@@ -15,14 +15,15 @@ namespace MQTTnet.Server
IMqttServerConnectionValidator ConnectionValidator { get; }
IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; }
+ IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; }
IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; }
IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; }
MqttServerTcpEndpointOptions DefaultEndpointOptions { get; }
MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; }
- IMqttServerStorage Storage { get; }
-
+ IMqttServerStorage Storage { get; }
+ IMqttRetainedMessagesManager RetainedMessagesManager { get; }
}
}
\ No newline at end of file
diff --git a/Source/MQTTnet/Server/IMqttServerUnsubscriptionInterceptor.cs b/Source/MQTTnet/Server/IMqttServerUnsubscriptionInterceptor.cs
new file mode 100644
index 0000000..9669383
--- /dev/null
+++ b/Source/MQTTnet/Server/IMqttServerUnsubscriptionInterceptor.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace MQTTnet.Server
+{
+ public interface IMqttServerUnsubscriptionInterceptor
+ {
+ Task InterceptUnsubscriptionAsync(MqttUnsubscriptionInterceptorContext context);
+ }
+}
diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs
index e71d1a8..c9e2553 100644
--- a/Source/MQTTnet/Server/MqttClientConnection.cs
+++ b/Source/MQTTnet/Server/MqttClientConnection.cs
@@ -21,7 +21,7 @@ namespace MQTTnet.Server
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
- private readonly MqttRetainedMessagesManager _retainedMessagesManager;
+ private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientSessionsManager _sessionsManager;
@@ -36,7 +36,7 @@ namespace MQTTnet.Server
private Task _packageReceiverTask;
private DateTime _lastPacketReceivedTimestamp;
private DateTime _lastNonKeepAlivePacketReceivedTimestamp;
-
+
private long _receivedPacketsCount;
private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
private long _receivedApplicationMessagesCount;
@@ -48,14 +48,14 @@ namespace MQTTnet.Server
MqttClientSession session,
IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager,
- MqttRetainedMessagesManager retainedMessagesManager,
+ IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetChildLogger logger)
{
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
-
+
_channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
_dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
_endpoint = _channelAdapter.Endpoint;
@@ -76,7 +76,7 @@ namespace MQTTnet.Server
public string ClientId => ConnectPacket.ClientId;
public MqttClientSession Session { get; }
-
+
public async Task StopAsync()
{
StopInternal();
@@ -112,25 +112,25 @@ namespace MQTTnet.Server
status.BytesSent = _channelAdapter.BytesSent;
status.BytesReceived = _channelAdapter.BytesReceived;
}
-
+
public void Dispose()
{
_cancellationToken.Dispose();
}
- public Task RunAsync()
+ public Task RunAsync(MqttConnectionValidatorContext connectionValidatorContext)
{
- _packageReceiverTask = RunInternalAsync();
+ _packageReceiverTask = RunInternalAsync(connectionValidatorContext);
return _packageReceiverTask;
}
- private async Task RunInternalAsync()
+ private async Task RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
{
var disconnectType = MqttClientDisconnectType.NotClean;
try
{
_logger.Info("Client '{0}': Session started.", ClientId);
-
+
_channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
_channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted;
@@ -142,12 +142,8 @@ namespace MQTTnet.Server
_keepAliveMonitor.Start(ConnectPacket.KeepAlivePeriod, _cancellationToken.Token);
await SendAsync(
- new MqttConnAckPacket
- {
- ReturnCode = MqttConnectReturnCode.ConnectionAccepted,
- ReasonCode = MqttConnectReasonCode.Success,
- IsSessionPresent = !Session.IsCleanSession
- }).ConfigureAwait(false);
+ _channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext)
+ ).ConfigureAwait(false);
Session.IsCleanSession = false;
@@ -248,7 +244,7 @@ namespace MQTTnet.Server
_channelAdapter.ReadingPacketCompletedCallback = null;
_logger.Info("Client '{0}': Session stopped.", ClientId);
-
+
_packageReceiverTask = null;
}
diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs
index d165001..d097b9f 100644
--- a/Source/MQTTnet/Server/MqttClientSession.cs
+++ b/Source/MQTTnet/Server/MqttClientSession.cs
@@ -52,7 +52,7 @@ namespace MQTTnet.Server
ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}
- public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager)
+ public async Task SubscribeAsync(ICollection topicFilters, IMqttRetainedMessagesManager retainedMessagesManager)
{
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);
diff --git a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs
index 901ac75..0cf19c8 100644
--- a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs
+++ b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs
@@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace MQTTnet.Server
{
- public class MqttClientSessionApplicationMessagesQueue : IDisposable
+ public class MqttClientSessionApplicationMessagesQueue : Disposable
{
private readonly AsyncQueue _messageQueue = new AsyncQueue();
@@ -71,9 +71,14 @@ namespace MQTTnet.Server
}
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- _messageQueue.Dispose();
+ if (disposing)
+ {
+ _messageQueue.Dispose();
+ }
+
+ base.Dispose(disposing);
}
}
}
diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
index db70e95..28c163d 100644
--- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs
+++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs
@@ -13,7 +13,7 @@ using MQTTnet.Server.Status;
namespace MQTTnet.Server
{
- public class MqttClientSessionsManager : IDisposable
+ public class MqttClientSessionsManager : Disposable
{
private readonly AsyncQueue _messageQueue = new AsyncQueue();
@@ -25,13 +25,13 @@ namespace MQTTnet.Server
private readonly CancellationToken _cancellationToken;
private readonly MqttServerEventDispatcher _eventDispatcher;
- private readonly MqttRetainedMessagesManager _retainedMessagesManager;
+ private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;
public MqttClientSessionsManager(
IMqttServerOptions options,
- MqttRetainedMessagesManager retainedMessagesManager,
+ IMqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
@@ -72,7 +72,7 @@ namespace MQTTnet.Server
{
var clientStatus = new MqttClientStatus(connection);
connection.FillStatus(clientStatus);
-
+
var sessionStatus = new MqttSessionStatus(connection.Session, this);
connection.Session.FillStatus(sessionStatus);
clientStatus.Session = sessionStatus;
@@ -91,7 +91,7 @@ namespace MQTTnet.Server
{
var sessionStatus = new MqttSessionStatus(session, this);
session.FillStatus(sessionStatus);
-
+
result.Add(sessionStatus);
}
@@ -145,9 +145,13 @@ namespace MQTTnet.Server
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- _messageQueue?.Dispose();
+ if (disposing)
+ {
+ _messageQueue?.Dispose();
+ }
+ base.Dispose(disposing);
}
private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)
@@ -230,6 +234,7 @@ namespace MQTTnet.Server
{
var disconnectType = MqttClientDisconnectType.NotClean;
string clientId = null;
+ var clientWasConnected = true;
try
{
@@ -240,12 +245,13 @@ namespace MQTTnet.Server
return;
}
- clientId = connectPacket.ClientId;
-
var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);
+ clientId = connectPacket.ClientId;
+
if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
+ clientWasConnected = false;
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
@@ -257,8 +263,8 @@ namespace MQTTnet.Server
var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);
await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
-
- disconnectType = await connection.RunAsync().ConfigureAwait(false);
+
+ disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -269,21 +275,24 @@ namespace MQTTnet.Server
}
finally
{
- if (clientId != null)
+ if (clientWasConnected)
{
- _connections.TryRemove(clientId, out _);
-
- if (!_options.EnablePersistentSessions)
+ if (clientId != null)
{
- await DeleteSessionAsync(clientId).ConfigureAwait(false);
+ _connections.TryRemove(clientId, out _);
+
+ if (!_options.EnablePersistentSessions)
+ {
+ await DeleteSessionAsync(clientId).ConfigureAwait(false);
+ }
}
- }
- await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);
+ await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);
- if (clientId != null)
- {
- await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
+ if (clientId != null)
+ {
+ await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
+ }
}
}
}
@@ -328,13 +337,13 @@ namespace MQTTnet.Server
{
await existingConnection.StopAsync().ConfigureAwait(false);
}
-
+
if (isSessionPresent)
{
if (connectPacket.CleanSession)
{
session = null;
-
+
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
}
else
diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
index c84a018..deeadf4 100644
--- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
+++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
@@ -107,9 +107,16 @@ namespace MQTTnet.Server
PacketIdentifier = unsubscribePacket.PacketIdentifier
};
- lock (_subscriptions)
+ foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
- foreach (var topicFilter in unsubscribePacket.TopicFilters)
+ var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false);
+ if (!interceptorContext.AcceptUnsubscription)
+ {
+ unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.ImplementationSpecificError);
+ continue;
+ }
+
+ lock (_subscriptions)
{
if (_subscriptions.Remove(topicFilter))
{
@@ -130,19 +137,23 @@ namespace MQTTnet.Server
return unsubAckPacket;
}
- public Task UnsubscribeAsync(IEnumerable topicFilters)
+ public async Task UnsubscribeAsync(IEnumerable topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
- lock (_subscriptions)
+ foreach (var topicFilter in topicFilters)
{
- foreach (var topicFilter in topicFilters)
+ var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false);
+ if (!interceptorContext.AcceptUnsubscription)
{
- _subscriptions.Remove(topicFilter);
+ continue;
}
- }
- return Task.FromResult(0);
+ lock (_subscriptions)
+ {
+ _subscriptions.Remove(topicFilter);
+ }
+ }
}
public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel)
@@ -206,6 +217,17 @@ namespace MQTTnet.Server
return context;
}
+ private async Task InterceptUnsubscribeAsync(string topicFilter)
+ {
+ var context = new MqttUnsubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items);
+ if (_serverOptions.UnsubscriptionInterceptor != null)
+ {
+ await _serverOptions.UnsubscriptionInterceptor.InterceptUnsubscriptionAsync(context).ConfigureAwait(false);
+ }
+
+ return context;
+ }
+
private static CheckSubscriptionsResult CreateSubscriptionResult(MqttQualityOfServiceLevel qosLevel, HashSet subscribedQoSLevels)
{
MqttQualityOfServiceLevel effectiveQoS;
diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
index 2e6af16..f4ebe48 100644
--- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
+++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
@@ -3,24 +3,26 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
+using MQTTnet.Implementations;
using MQTTnet.Internal;
namespace MQTTnet.Server
{
- public class MqttRetainedMessagesManager
+ public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager
{
private readonly byte[] _emptyArray = new byte[0];
private readonly AsyncLock _messagesLock = new AsyncLock();
private readonly Dictionary _messages = new Dictionary();
- private readonly IMqttNetChildLogger _logger;
- private readonly IMqttServerOptions _options;
+ private IMqttNetChildLogger _logger;
+ private IMqttServerOptions _options;
- public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetChildLogger logger)
+ public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager));
_options = options ?? throw new ArgumentNullException(nameof(options));
+ return PlatformAbstractionLayer.CompletedTask;
}
public async Task LoadMessagesAsync()
@@ -103,7 +105,7 @@ namespace MQTTnet.Server
}
}
- public async Task> GetSubscribedMessagesAsync(ICollection topicFilters)
+ public async Task> GetSubscribedMessagesAsync(ICollection topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
@@ -128,7 +130,7 @@ namespace MQTTnet.Server
break;
}
}
-
+
return matchingRetainedMessages;
}
diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs
index f902fc0..4c5ab62 100644
--- a/Source/MQTTnet/Server/MqttServer.cs
+++ b/Source/MQTTnet/Server/MqttServer.cs
@@ -7,6 +7,7 @@ using MQTTnet.Adapter;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
+using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;
@@ -19,7 +20,7 @@ namespace MQTTnet.Server
private readonly IMqttNetChildLogger _logger;
private MqttClientSessionsManager _clientSessionsManager;
- private MqttRetainedMessagesManager _retainedMessagesManager;
+ private IMqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource;
public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger)
@@ -48,7 +49,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientDisconnectedHandler;
set => _eventDispatcher.ClientDisconnectedHandler = value;
}
-
+
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{
get => _eventDispatcher.ClientSubscribedTopicHandler;
@@ -60,7 +61,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientUnsubscribedTopicHandler;
set => _eventDispatcher.ClientUnsubscribedTopicHandler = value;
}
-
+
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => _eventDispatcher.ApplicationMessageReceivedHandler;
@@ -117,11 +118,14 @@ namespace MQTTnet.Server
{
Options = options ?? throw new ArgumentNullException(nameof(options));
+ if (Options.RetainedMessagesManager == null) throw new MqttConfigurationException("options.RetainedMessagesManager should not be null.");
+
if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started.");
_cancellationTokenSource = new CancellationTokenSource();
- _retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
+ _retainedMessagesManager = Options.RetainedMessagesManager;
+ await _retainedMessagesManager.Start(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);
_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
@@ -150,9 +154,9 @@ namespace MQTTnet.Server
{
return;
}
-
+
await _clientSessionsManager.StopAsync().ConfigureAwait(false);
-
+
_cancellationTokenSource.Cancel(false);
foreach (var adapter in _adapters)
diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs
index 7147ef8..9773e72 100644
--- a/Source/MQTTnet/Server/MqttServerOptions.cs
+++ b/Source/MQTTnet/Server/MqttServerOptions.cs
@@ -21,11 +21,15 @@ namespace MQTTnet.Server
public IMqttServerConnectionValidator ConnectionValidator { get; set; }
public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; }
-
+
public IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; set; }
public IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; set; }
+ public IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; set; }
+
public IMqttServerStorage Storage { get; set; }
+
+ public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager();
}
}
diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
index c25af84..2970fab 100644
--- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
+++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
@@ -57,7 +57,7 @@ namespace MQTTnet.Server
_options.DefaultEndpointOptions.IsEnabled = false;
return this;
}
-
+
public MqttServerOptionsBuilder WithEncryptedEndpoint()
{
_options.TlsEndpointOptions.IsEnabled = true;
@@ -118,13 +118,19 @@ namespace MQTTnet.Server
return this;
}
#endif
-
+
public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value)
{
_options.Storage = value;
return this;
}
+ public MqttServerOptionsBuilder WithRetainedMessagesManager(IMqttRetainedMessagesManager value)
+ {
+ _options.RetainedMessagesManager = value;
+ return this;
+ }
+
public MqttServerOptionsBuilder WithConnectionValidator(IMqttServerConnectionValidator value)
{
_options.ConnectionValidator = value;
@@ -155,6 +161,12 @@ namespace MQTTnet.Server
return this;
}
+ public MqttServerOptionsBuilder WithUnsubscriptionInterceptor(IMqttServerUnsubscriptionInterceptor value)
+ {
+ _options.UnsubscriptionInterceptor = value;
+ return this;
+ }
+
public MqttServerOptionsBuilder WithSubscriptionInterceptor(Action value)
{
_options.SubscriptionInterceptor = new MqttServerSubscriptionInterceptorDelegate(value);
diff --git a/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs
new file mode 100644
index 0000000..b33cbac
--- /dev/null
+++ b/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs
@@ -0,0 +1,27 @@
+using System.Collections.Generic;
+
+namespace MQTTnet.Server
+{
+ public class MqttUnsubscriptionInterceptorContext
+ {
+ public MqttUnsubscriptionInterceptorContext(string clientId, string topic, IDictionary