|
@@ -17,7 +17,7 @@ namespace MQTTnet.Core.Adapter |
|
|
{ |
|
|
{ |
|
|
private readonly IMqttCommunicationChannel _channel; |
|
|
private readonly IMqttCommunicationChannel _channel; |
|
|
|
|
|
|
|
|
private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write |
|
|
|
|
|
|
|
|
private readonly Task _waitTask = Task.FromResult(0); // this task is used to prevent overlapping write |
|
|
|
|
|
|
|
|
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) |
|
|
public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) |
|
|
{ |
|
|
{ |
|
@@ -79,31 +79,30 @@ namespace MQTTnet.Core.Adapter |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
lock (_channel) |
|
|
|
|
|
|
|
|
var waitTask = _waitTask; |
|
|
|
|
|
|
|
|
|
|
|
foreach (var packet in packets) |
|
|
{ |
|
|
{ |
|
|
foreach (var packet in packets) |
|
|
|
|
|
{ |
|
|
|
|
|
if (packet == null) {continue; } |
|
|
|
|
|
|
|
|
if (packet == null) { continue; } |
|
|
|
|
|
|
|
|
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); |
|
|
|
|
|
|
|
|
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); |
|
|
|
|
|
|
|
|
var writeBuffer = PacketSerializer.Serialize(packet); |
|
|
|
|
|
_sendTask = _sendTask.ContinueWith(p => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
var writeBuffer = PacketSerializer.Serialize(packet); |
|
|
|
|
|
|
|
|
|
|
|
waitTask = waitTask.ContinueWith(t => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
waitTask = waitTask.ContinueWith(t => |
|
|
|
|
|
{ |
|
|
if (timeout > TimeSpan.Zero) |
|
|
if (timeout > TimeSpan.Zero) |
|
|
{ |
|
|
{ |
|
|
_sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout), cancellationToken).ConfigureAwait(false); |
|
|
|
|
|
} |
|
|
|
|
|
else |
|
|
|
|
|
{ |
|
|
|
|
|
_sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken), cancellationToken); |
|
|
|
|
|
|
|
|
return _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
await _sendTask; // configure await false generates stackoverflow |
|
|
|
|
|
|
|
|
return _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); |
|
|
|
|
|
}, cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
await waitTask; // configure await false generates stackoverflow |
|
|
} |
|
|
} |
|
|
catch (TaskCanceledException) |
|
|
catch (TaskCanceledException) |
|
|
{ |
|
|
{ |
|
|