diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 90ec842..c0c71eb 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Core.Adapter { private readonly IMqttCommunicationChannel _channel; - private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write + private readonly Task _waitTask = Task.FromResult(0); // this task is used to prevent overlapping write public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) { @@ -79,31 +79,30 @@ namespace MQTTnet.Core.Adapter { try { - lock (_channel) + var waitTask = _waitTask; + + foreach (var packet in packets) { - foreach (var packet in packets) - { - if (packet == null) {continue; } + if (packet == null) { continue; } - MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); + MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); - var writeBuffer = PacketSerializer.Serialize(packet); - _sendTask = _sendTask.ContinueWith(p => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken); - } + var writeBuffer = PacketSerializer.Serialize(packet); + + waitTask = waitTask.ContinueWith(t => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken); + } + waitTask = waitTask.ContinueWith(t => + { if (timeout > TimeSpan.Zero) { - _sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout), cancellationToken).ConfigureAwait(false); - } - else - { - _sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken), cancellationToken); + return _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); } - } - - await _sendTask; // configure await false generates stackoverflow + return _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); + }, cancellationToken); + await waitTask; // configure await false generates stackoverflow } catch (TaskCanceledException) {