From a77e0b8dde664377b89560109a51af7cedc68a08 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 21 Dec 2019 09:09:03 +0100 Subject: [PATCH] spread dispose pattern --- .../ManagedMqttClient.cs | 49 +++++++------------ .../MQTTnet/Implementations/MqttTcpChannel.cs | 16 ++++-- .../Implementations/MqttWebSocketChannel.cs | 35 +++++++------ Source/MQTTnet/Internal/Disposable.cs | 2 + .../PacketDispatcher/MqttPacketAwaiter.cs | 11 +++-- 5 files changed, 62 insertions(+), 51 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 89efe6b..9bb1e54 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -16,7 +16,7 @@ using MQTTnet.Server; namespace MQTTnet.Extensions.ManagedClient { - public class ManagedMqttClient : IManagedMqttClient + public class ManagedMqttClient : Disposable, IManagedMqttClient { private readonly BlockingQueue _messageQueue = new BlockingQueue(); @@ -42,15 +42,15 @@ namespace MQTTnet.Extensions.ManagedClient private Task _maintainConnectionTask; private ManagedMqttClientStorageManager _storageManager; - - private bool _disposed; - + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(ManagedMqttClient)); + + Options = new ManagedMqttClientOptions(); } public bool IsConnected => _mqttClient.IsConnected; @@ -242,36 +242,25 @@ namespace MQTTnet.Extensions.ManagedClient return Task.FromResult(0); } - public void Dispose() + protected override void Dispose(bool disposing) { - if (_disposed) + if (disposing) { - return; - } - - _disposed = true; - - StopPublishing(); - StopMaintainingConnection(); - - if (_maintainConnectionTask != null) - { - Task.WaitAny(_maintainConnectionTask); - _maintainConnectionTask = null; - } + StopPublishing(); + StopMaintainingConnection(); - _messageQueue.Dispose(); - _messageQueueLock.Dispose(); - _mqttClient.Dispose(); - _subscriptionsQueuedSignal.Dispose(); - } + if (_maintainConnectionTask != null) + { + _maintainConnectionTask.GetAwaiter().GetResult(); + _maintainConnectionTask = null; + } - private void ThrowIfDisposed() - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(ManagedMqttClient)); + _messageQueue.Dispose(); + _messageQueueLock.Dispose(); + _mqttClient.Dispose(); + _subscriptionsQueuedSignal.Dispose(); } + base.Dispose(disposing); } private async Task MaintainConnectionAsync(CancellationToken cancellationToken) @@ -292,7 +281,7 @@ namespace MQTTnet.Extensions.ManagedClient } finally { - if (!_disposed) + if (!IsDisposed) { try { diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index d500813..9b2ba56 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -10,10 +10,11 @@ using System.Runtime.ExceptionServices; using System.Threading; using MQTTnet.Channel; using MQTTnet.Client.Options; +using MQTTnet.Internal; namespace MQTTnet.Implementations { - public class MqttTcpChannel : IMqttChannel + public class MqttTcpChannel : Disposable, IMqttChannel { private readonly IMqttClientOptions _clientOptions; private readonly MqttClientTcpOptions _options; @@ -94,7 +95,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync(CancellationToken cancellationToken) { - Dispose(); + Cleanup(); return Task.FromResult(0); } @@ -158,7 +159,7 @@ namespace MQTTnet.Implementations } } - public void Dispose() + private void Cleanup() { // When the stream is disposed it will also close the socket and this will also dispose it. // So there is no need to dispose the socket again. @@ -177,6 +178,15 @@ namespace MQTTnet.Implementations _stream = null; } + protected override void Dispose(bool disposing) + { + if (disposing) + { + Cleanup(); + } + base.Dispose(disposing); + } + private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { if (_options.TlsOptions.CertificateValidationCallback != null) diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index 4a3dc70..c159b91 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -112,7 +112,7 @@ namespace MQTTnet.Implementations await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } - Dispose(); + Cleanup(); } public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -146,24 +146,29 @@ namespace MQTTnet.Implementations { if (disposing) { - _sendLock?.Dispose(); - _sendLock = null; - - try - { - _webSocket?.Dispose(); - } - catch (ObjectDisposedException) - { - } - finally - { - _webSocket = null; - } + Cleanup(); } base.Dispose(disposing); } + private void Cleanup() + { + _sendLock?.Dispose(); + _sendLock = null; + + try + { + _webSocket?.Dispose(); + } + catch (ObjectDisposedException) + { + } + finally + { + _webSocket = null; + } + } + private IWebProxy CreateProxy() { if (string.IsNullOrEmpty(_options.ProxyOptions?.Address)) diff --git a/Source/MQTTnet/Internal/Disposable.cs b/Source/MQTTnet/Internal/Disposable.cs index 2074b49..f8a72b5 100644 --- a/Source/MQTTnet/Internal/Disposable.cs +++ b/Source/MQTTnet/Internal/Disposable.cs @@ -4,6 +4,8 @@ namespace MQTTnet.Internal { public class Disposable : IDisposable { + protected bool IsDisposed => _isDisposed; + protected void ThrowIfDisposed() { if (_isDisposed) diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs index 19df6d4..8f906f6 100644 --- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs +++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs @@ -2,11 +2,12 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Exceptions; +using MQTTnet.Internal; using MQTTnet.Packets; namespace MQTTnet.PacketDispatcher { - public sealed class MqttPacketAwaiter : IMqttPacketAwaiter where TPacket : MqttBasePacket + public sealed class MqttPacketAwaiter : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket { private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); private readonly ushort? _packetIdentifier; @@ -52,9 +53,13 @@ namespace MQTTnet.PacketDispatcher Task.Run(() => _taskCompletionSource.TrySetCanceled()); } - public void Dispose() + protected override void Dispose(bool disposing) { - _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier); + if (disposing) + { + _owningPacketDispatcher.RemovePacketAwaiter(_packetIdentifier); + } + base.Dispose(disposing); } } } \ No newline at end of file