|
|
@@ -83,35 +83,39 @@ namespace MQTTnet.Core.Adapter |
|
|
|
{ |
|
|
|
foreach (var packet in packets) |
|
|
|
{ |
|
|
|
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); |
|
|
|
if (packet == null) continue; |
|
|
|
|
|
|
|
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); |
|
|
|
|
|
|
|
var writeBuffer = PacketSerializer.Serialize(packet); |
|
|
|
_sendTask = _sendTask.ContinueWith(p => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
if (timeout > TimeSpan.Zero) |
|
|
|
{ |
|
|
|
_sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout), cancellationToken);// _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout);//.ConfigureAwait(false); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
_sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken), cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
await _sendTask; // configure await false generates stackoverflow |
|
|
|
|
|
|
|
if (timeout > TimeSpan.Zero) |
|
|
|
{ |
|
|
|
await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (TaskCanceledException) |
|
|
|
catch (TaskCanceledException ex) |
|
|
|
{ |
|
|
|
throw; |
|
|
|
throw ex; |
|
|
|
} |
|
|
|
catch (MqttCommunicationTimedOutException) |
|
|
|
catch (MqttCommunicationTimedOutException ex) |
|
|
|
{ |
|
|
|
throw; |
|
|
|
throw ex; |
|
|
|
} |
|
|
|
catch (MqttCommunicationException) |
|
|
|
catch (MqttCommunicationException ex) |
|
|
|
{ |
|
|
|
throw; |
|
|
|
throw ex; |
|
|
|
} |
|
|
|
catch (Exception exception) |
|
|
|
{ |
|
|
|