Browse Source

Avoid unnecessary enumeration for every packet

release/3.x.x
Israel Lot 6 years ago
parent
commit
5300742543
2 changed files with 30 additions and 26 deletions
  1. +1
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs
  2. +29
    -25
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs View File

@@ -15,7 +15,7 @@ namespace MQTTnet.Adapter


Task DisconnectAsync(TimeSpan timeout); Task DisconnectAsync(TimeSpan timeout);


Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets);
Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets);


Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken); Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken);
} }


+ 29
- 25
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -51,40 +51,44 @@ namespace MQTTnet.Adapter
return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
} }


public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets)
{
for(var i=0;i<packets.Length;i++)
{
await SendPacketsAsync(timeout, cancellationToken, packets[i]).ConfigureAwait(false);
}

}

public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket packet)
{ {
ThrowIfDisposed(); ThrowIfDisposed();


if (packet == null)
{
return Task.FromResult(0);
}

return ExecuteAndWrapExceptionAsync(async () => return ExecuteAndWrapExceptionAsync(async () =>
{ {
foreach (var packet in packets)
if (cancellationToken.IsCancellationRequested)
{ {
if (cancellationToken.IsCancellationRequested)
{
return;
}

if (packet == null)
{
continue;
}

_logger.Verbose<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);
return;
}


var packetData = PacketSerializer.Serialize(packet);
if (cancellationToken.IsCancellationRequested)
{
return;
}
await _channel.SendStream.WriteAsync(
packetData.Array,
packetData.Offset,
(int)packetData.Count,
cancellationToken).ConfigureAwait(false);
_logger.Verbose<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout);


var packetData = PacketSerializer.Serialize(packet);
if (cancellationToken.IsCancellationRequested)
{
return;
} }

await _channel.SendStream.WriteAsync(
packetData.Array,
packetData.Offset,
(int)packetData.Count,
cancellationToken).ConfigureAwait(false);


}); });
} }


Loading…
Cancel
Save