diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 61f0646..9bc9aca 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -25,7 +25,7 @@ namespace MQTTnet.Adapter readonly byte[] _fixedHeaderBuffer = new byte[2]; - SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); long _bytesReceived; long _bytesSent; @@ -111,6 +111,8 @@ namespace MQTTnet.Adapter public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) { + ThrowIfDisposed(); + await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -211,9 +213,7 @@ namespace MQTTnet.Adapter if (disposing) { _channel?.Dispose(); - _writerSemaphore?.Dispose(); - _writerSemaphore = null; } base.Dispose(disposing); diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index d4c595a..da7d6e6 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -446,7 +446,11 @@ namespace MQTTnet.Client await SendAndReceiveAsync(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false); } - await Task.Delay(keepAlivePeriod, cancellationToken).ConfigureAwait(false); + // Wait a fixed time in all cases. Calculation of the remaining time is complicated + // due to some edge cases and was buggy in the past. Now we wait half a second because the + // min keep alive value is one second so that the server will wait 1.5 seconds for a PING + // packet. + await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken).ConfigureAwait(false); } } catch (Exception exception)