Browse Source

spread dispose pattern

release/3.x.x
JanEggers 5 years ago
parent
commit
a77e0b8dde
5 changed files with 62 additions and 51 deletions
  1. +19
    -30
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  2. +13
    -3
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  3. +20
    -15
      Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
  4. +2
    -0
      Source/MQTTnet/Internal/Disposable.cs
  5. +8
    -3
      Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs

+ 19
- 30
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -16,7 +16,7 @@ using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
public class ManagedMqttClient : Disposable, IManagedMqttClient
{
private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();

@@ -42,15 +42,15 @@ namespace MQTTnet.Extensions.ManagedClient
private Task _maintainConnectionTask;

private ManagedMqttClientStorageManager _storageManager;

private bool _disposed;

public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(ManagedMqttClient));

Options = new ManagedMqttClientOptions();
}

public bool IsConnected => _mqttClient.IsConnected;
@@ -242,36 +242,25 @@ namespace MQTTnet.Extensions.ManagedClient
return Task.FromResult(0);
}

public void Dispose()
protected override void Dispose(bool disposing)
{
if (_disposed)
if (disposing)
{
return;
}

_disposed = true;

StopPublishing();
StopMaintainingConnection();

if (_maintainConnectionTask != null)
{
Task.WaitAny(_maintainConnectionTask);
_maintainConnectionTask = null;
}
StopPublishing();
StopMaintainingConnection();

_messageQueue.Dispose();
_messageQueueLock.Dispose();
_mqttClient.Dispose();
_subscriptionsQueuedSignal.Dispose();
}
if (_maintainConnectionTask != null)
{
_maintainConnectionTask.GetAwaiter().GetResult();
_maintainConnectionTask = null;
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ManagedMqttClient));
_messageQueue.Dispose();
_messageQueueLock.Dispose();
_mqttClient.Dispose();
_subscriptionsQueuedSignal.Dispose();
}
base.Dispose(disposing);
}

private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
@@ -292,7 +281,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
finally
{
if (!_disposed)
if (!IsDisposed)
{
try
{


+ 13
- 3
Source/MQTTnet/Implementations/MqttTcpChannel.cs View File

@@ -10,10 +10,11 @@ using System.Runtime.ExceptionServices;
using System.Threading;
using MQTTnet.Channel;
using MQTTnet.Client.Options;
using MQTTnet.Internal;

namespace MQTTnet.Implementations
{
public class MqttTcpChannel : IMqttChannel
public class MqttTcpChannel : Disposable, IMqttChannel
{
private readonly IMqttClientOptions _clientOptions;
private readonly MqttClientTcpOptions _options;
@@ -94,7 +95,7 @@ namespace MQTTnet.Implementations

public Task DisconnectAsync(CancellationToken cancellationToken)
{
Dispose();
Cleanup();
return Task.FromResult(0);
}

@@ -158,7 +159,7 @@ namespace MQTTnet.Implementations
}
}

public void Dispose()
private void Cleanup()
{
// When the stream is disposed it will also close the socket and this will also dispose it.
// So there is no need to dispose the socket again.
@@ -177,6 +178,15 @@ namespace MQTTnet.Implementations
_stream = null;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
}
base.Dispose(disposing);
}

private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
if (_options.TlsOptions.CertificateValidationCallback != null)


+ 20
- 15
Source/MQTTnet/Implementations/MqttWebSocketChannel.cs View File

@@ -112,7 +112,7 @@ namespace MQTTnet.Implementations
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}

Dispose();
Cleanup();
}

public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -146,24 +146,29 @@ namespace MQTTnet.Implementations
{
if (disposing)
{
_sendLock?.Dispose();
_sendLock = null;

try
{
_webSocket?.Dispose();
}
catch (ObjectDisposedException)
{
}
finally
{
_webSocket = null;
}
Cleanup();
}
base.Dispose(disposing);
}

private void Cleanup()
{
_sendLock?.Dispose();
_sendLock = null;

try
{
_webSocket?.Dispose();
}
catch (ObjectDisposedException)
{
}
finally
{
_webSocket = null;
}
}

private IWebProxy CreateProxy()
{
if (string.IsNullOrEmpty(_options.ProxyOptions?.Address))


+ 2
- 0
Source/MQTTnet/Internal/Disposable.cs View File

@@ -4,6 +4,8 @@ namespace MQTTnet.Internal
{
public class Disposable : IDisposable
{
protected bool IsDisposed => _isDisposed;

protected void ThrowIfDisposed()
{
if (_isDisposed)


+ 8
- 3
Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs View File

@@ -2,11 +2,12 @@
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets;

namespace MQTTnet.PacketDispatcher
{
public sealed class MqttPacketAwaiter<TPacket> : IMqttPacketAwaiter where TPacket : MqttBasePacket
public sealed class MqttPacketAwaiter<TPacket> : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket
{
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>();
private readonly ushort? _packetIdentifier;
@@ -52,9 +53,13 @@ namespace MQTTnet.PacketDispatcher
Task.Run(() => _taskCompletionSource.TrySetCanceled());
}

public void Dispose()
protected override void Dispose(bool disposing)
{
_owningPacketDispatcher.RemovePacketAwaiter<TPacket>(_packetIdentifier);
if (disposing)
{
_owningPacketDispatcher.RemovePacketAwaiter<TPacket>(_packetIdentifier);
}
base.Dispose(disposing);
}
}
}

Loading…
Cancel
Save