Переглянути джерело

Remove timeout from send method.

release/3.x.x
Christian 4 роки тому
джерело
коміт
1f78a92c4d
18 змінених файлів з 83 додано та 83 видалено
  1. +2
    -1
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  2. +1
    -1
      Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
  3. +2
    -10
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  4. +6
    -5
      Source/MQTTnet/Client/MqttClient.cs
  5. +4
    -3
      Source/MQTTnet/Client/MqttPacketIdentifierProvider.cs
  6. +1
    -1
      Source/MQTTnet/Client/Options/MqttClientTcpOptions.cs
  7. +7
    -1
      Source/MQTTnet/Implementations/CrossPlatformSocket.cs
  8. +25
    -28
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  9. +1
    -1
      Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs
  10. +10
    -9
      Source/MQTTnet/Server/MqttClientConnection.cs
  11. +4
    -4
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  12. +2
    -2
      Source/MQTTnet/Server/MqttPendingApplicationMessage.cs
  13. +1
    -1
      Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs
  14. +1
    -1
      Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
  15. +1
    -1
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs
  16. +4
    -2
      Tests/MQTTnet.Core.Tests/Protocol_Tests.cs
  17. +11
    -11
      Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
  18. +0
    -1
      Tests/MQTTnet.TestApp.NetCore/Program.cs

+ 2
- 1
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs Переглянути файл

@@ -167,7 +167,7 @@ namespace MQTTnet.AspNetCore
BytesSent = 0;
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
var formatter = PacketFormatterAdapter;
using (await _writerLock.WaitAsync(cancellationToken).ConfigureAwait(false))
@@ -180,6 +180,7 @@ namespace MQTTnet.AspNetCore
{
BytesSent += msg.Length;
}

PacketFormatterAdapter.FreeBuffer();
}
}


+ 1
- 1
Source/MQTTnet/Adapter/IMqttChannelAdapter.cs Переглянути файл

@@ -27,7 +27,7 @@ namespace MQTTnet.Adapter

Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken);

Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken);
Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken);

Task<MqttBasePacket> ReceivePacketAsync(CancellationToken cancellationToken);



+ 2
- 10
Source/MQTTnet/Adapter/MqttChannelAdapter.cs Переглянути файл

@@ -111,7 +111,7 @@ namespace MQTTnet.Adapter
}
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
@@ -125,15 +125,7 @@ namespace MQTTnet.Adapter
{
var packetData = PacketFormatterAdapter.Encode(packet);

if (timeout == TimeSpan.Zero)
{
await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);
}
else
{
await MqttTaskTimeout.WaitAsync(
t => _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, t), timeout, cancellationToken).ConfigureAwait(false);
}
await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);

Interlocked.Add(ref _bytesReceived, packetData.Count);



+ 6
- 5
Source/MQTTnet/Client/MqttClient.cs Переглянути файл

@@ -219,11 +219,13 @@ namespace MQTTnet.Client

public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

MqttTopicValidator.ThrowIfInvalid(applicationMessage);

ThrowIfDisposed();
ThrowIfNotConnected();
var publishPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(applicationMessage);

switch (applicationMessage.QualityOfServiceLevel)
@@ -381,7 +383,7 @@ namespace MQTTnet.Client

_sendTracker.Restart();

return _adapter.SendPacketAsync(packet, Options.CommunicationTimeout, cancellationToken);
return _adapter.SendPacketAsync(packet, cancellationToken);
}

async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
@@ -399,8 +401,7 @@ namespace MQTTnet.Client
try
{
_sendTracker.Restart();

await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -439,7 +440,7 @@ namespace MQTTnet.Client

if (waitTime <= TimeSpan.Zero)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPingRespPacket>(MqttPingReqPacket.Instance, cancellationToken).ConfigureAwait(false);
}

// Wait a fixed time in all cases. Calculation of the remaining time is complicated


+ 4
- 3
Source/MQTTnet/Client/MqttPacketIdentifierProvider.cs Переглянути файл

@@ -1,9 +1,10 @@
namespace MQTTnet.Client
{
public class MqttPacketIdentifierProvider
public sealed class MqttPacketIdentifierProvider
{
private readonly object _syncRoot = new object();
private ushort _value;
readonly object _syncRoot = new object();

ushort _value;

public void Reset()
{


+ 1
- 1
Source/MQTTnet/Client/Options/MqttClientTcpOptions.cs Переглянути файл

@@ -8,7 +8,7 @@ namespace MQTTnet.Client.Options

public int? Port { get; set; }

public int BufferSize { get; set; } = 65536;
public int BufferSize { get; set; } = 8192;

public bool? DualMode { get; set; }



+ 7
- 1
Source/MQTTnet/Implementations/CrossPlatformSocket.cs Переглянути файл

@@ -51,13 +51,19 @@ namespace MQTTnet.Implementations
get => _socket.ReceiveBufferSize;
set => _socket.ReceiveBufferSize = value;
}
public int SendBufferSize
{
get => _socket.SendBufferSize;
set => _socket.SendBufferSize = value;
}

public int SendTimeout
{
get => _socket.SendTimeout;
set => _socket.SendTimeout = value;
}

public EndPoint RemoteEndPoint => _socket.RemoteEndPoint;

public bool ReuseAddress


+ 25
- 28
Source/MQTTnet/Implementations/MqttTcpChannel.cs Переглянути файл

@@ -17,14 +17,14 @@ namespace MQTTnet.Implementations
public sealed class MqttTcpChannel : IMqttChannel
{
readonly IMqttClientOptions _clientOptions;
readonly MqttClientTcpOptions _options;
readonly MqttClientTcpOptions _tcpOptions;

Stream _stream;

public MqttTcpChannel(IMqttClientOptions clientOptions)
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
_options = (MqttClientTcpOptions)clientOptions.ChannelOptions;
_tcpOptions = (MqttClientTcpOptions)clientOptions.ChannelOptions;

IsSecureConnection = clientOptions.ChannelOptions?.TlsOptions?.UseTls == true;
}
@@ -50,34 +50,35 @@ namespace MQTTnet.Implementations
CrossPlatformSocket socket = null;
try
{
if (_options.AddressFamily == AddressFamily.Unspecified)
if (_tcpOptions.AddressFamily == AddressFamily.Unspecified)
{
socket = new CrossPlatformSocket();
}
else
{
socket = new CrossPlatformSocket(_options.AddressFamily);
socket = new CrossPlatformSocket(_tcpOptions.AddressFamily);
}

socket.ReceiveBufferSize = _options.BufferSize;
socket.SendBufferSize = _options.BufferSize;
socket.NoDelay = _options.NoDelay;
socket.ReceiveBufferSize = _tcpOptions.BufferSize;
socket.SendBufferSize = _tcpOptions.BufferSize;
socket.SendTimeout = (int)_clientOptions.CommunicationTimeout.TotalMilliseconds;
socket.NoDelay = _tcpOptions.NoDelay;

if (_options.DualMode.HasValue)
if (_tcpOptions.DualMode.HasValue)
{
// It is important to avoid setting the flag if no specific value is set by the user
// because on IPv4 only networks the setter will always throw an exception. Regardless
// of the actual value.
socket.DualMode = _options.DualMode.Value;
socket.DualMode = _tcpOptions.DualMode.Value;
}

await socket.ConnectAsync(_options.Server, _options.GetPort(), cancellationToken).ConfigureAwait(false);
await socket.ConnectAsync(_tcpOptions.Server, _tcpOptions.GetPort(), cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

var networkStream = socket.GetStream();
if (_options.TlsOptions?.UseTls == true)
if (_tcpOptions.TlsOptions?.UseTls == true)
{
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback);
try
@@ -85,16 +86,16 @@ namespace MQTTnet.Implementations
#if NETCOREAPP3_1
var sslOptions = new SslClientAuthenticationOptions
{
ApplicationProtocols = _options.TlsOptions.ApplicationProtocols,
ApplicationProtocols = _tcpOptions.TlsOptions.ApplicationProtocols,
ClientCertificates = LoadCertificates(),
EnabledSslProtocols = _options.TlsOptions.SslProtocol,
CertificateRevocationCheckMode = _options.TlsOptions.IgnoreCertificateRevocationErrors ? X509RevocationMode.NoCheck : X509RevocationMode.Online,
TargetHost = _options.Server
EnabledSslProtocols = _tcpOptions.TlsOptions.SslProtocol,
CertificateRevocationCheckMode = _tcpOptions.TlsOptions.IgnoreCertificateRevocationErrors ? X509RevocationMode.NoCheck : X509RevocationMode.Online,
TargetHost = _tcpOptions.Server
};

await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false);
#else
await sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(), _options.TlsOptions.SslProtocol, !_options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
await sslStream.AuthenticateAsClientAsync(_tcpOptions.Server, LoadCertificates(), _tcpOptions.TlsOptions.SslProtocol, !_tcpOptions.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
#endif
}
catch
@@ -132,8 +133,6 @@ namespace MQTTnet.Implementations

public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null) throw new ArgumentNullException(nameof(buffer));

cancellationToken.ThrowIfCancellationRequested();

try
@@ -174,8 +173,6 @@ namespace MQTTnet.Implementations

public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null) throw new ArgumentNullException(nameof(buffer));

cancellationToken.ThrowIfCancellationRequested();

try
@@ -232,7 +229,7 @@ namespace MQTTnet.Implementations
#region OBSOLETE

#pragma warning disable CS0618 // Type or member is obsolete
var certificateValidationCallback = _options?.TlsOptions?.CertificateValidationCallback;
var certificateValidationCallback = _tcpOptions?.TlsOptions?.CertificateValidationCallback;
#pragma warning restore CS0618 // Type or member is obsolete
if (certificateValidationCallback != null)
{
@@ -240,7 +237,7 @@ namespace MQTTnet.Implementations
}
#endregion

var certificateValidationHandler = _options?.TlsOptions?.CertificateValidationHandler;
var certificateValidationHandler = _tcpOptions?.TlsOptions?.CertificateValidationHandler;
if (certificateValidationHandler != null)
{
var context = new MqttClientCertificateValidationCallbackContext
@@ -248,7 +245,7 @@ namespace MQTTnet.Implementations
Certificate = x509Certificate,
Chain = chain,
SslPolicyErrors = sslPolicyErrors,
ClientOptions = _options
ClientOptions = _tcpOptions
};

return certificateValidationHandler(context);
@@ -261,7 +258,7 @@ namespace MQTTnet.Implementations

if (chain.ChainStatus.Any(c => c.Status == X509ChainStatusFlags.RevocationStatusUnknown || c.Status == X509ChainStatusFlags.Revoked || c.Status == X509ChainStatusFlags.OfflineRevocation))
{
if (_options?.TlsOptions?.IgnoreCertificateRevocationErrors != true)
if (_tcpOptions?.TlsOptions?.IgnoreCertificateRevocationErrors != true)
{
return false;
}
@@ -269,24 +266,24 @@ namespace MQTTnet.Implementations

if (chain.ChainStatus.Any(c => c.Status == X509ChainStatusFlags.PartialChain))
{
if (_options?.TlsOptions?.IgnoreCertificateChainErrors != true)
if (_tcpOptions?.TlsOptions?.IgnoreCertificateChainErrors != true)
{
return false;
}
}

return _options?.TlsOptions?.AllowUntrustedCertificates == true;
return _tcpOptions?.TlsOptions?.AllowUntrustedCertificates == true;
}

X509CertificateCollection LoadCertificates()
{
var certificates = new X509CertificateCollection();
if (_options.TlsOptions.Certificates == null)
if (_tcpOptions.TlsOptions.Certificates == null)
{
return certificates;
}

foreach (var certificate in _options.TlsOptions.Certificates)
foreach (var certificate in _tcpOptions.TlsOptions.Certificates)
{
certificates.Add(certificate);
}


+ 1
- 1
Source/MQTTnet/LowLevelClient/LowLevelMqttClient.cs Переглянути файл

@@ -76,7 +76,7 @@ namespace MQTTnet.LowLevelClient

try
{
await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
await _adapter.SendPacketAsync(packet, cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{


+ 10
- 9
Source/MQTTnet/Server/MqttClientConnection.cs Переглянути файл

@@ -100,7 +100,7 @@ namespace MQTTnet.Server
await _channelAdapter.SendPacketAsync(new MqttDisconnectPacket
{
ReasonCode = reason
}, _serverOptions.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -485,16 +485,17 @@ namespace MQTTnet.Server
}
}

async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);

Interlocked.Increment(ref _receivedPacketsCount);

if (packet is MqttPublishPacket)
return _channelAdapter.SendPacketAsync(packet, cancellationToken).ContinueWith(task =>
{
Interlocked.Increment(ref _receivedApplicationMessagesCount);
}
Interlocked.Increment(ref _receivedPacketsCount);

if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _receivedApplicationMessagesCount);
}
}, cancellationToken);
}
}
}

+ 4
- 4
Source/MQTTnet/Server/MqttClientSessionsManager.cs Переглянути файл

@@ -18,7 +18,7 @@ namespace MQTTnet.Server
{
public sealed class MqttClientSessionsManager : IDisposable
{
readonly BlockingCollection<MqttEnqueuedApplicationMessage> _messageQueue = new BlockingCollection<MqttEnqueuedApplicationMessage>();
readonly BlockingCollection<MqttPendingApplicationMessage> _messageQueue = new BlockingCollection<MqttPendingApplicationMessage>();

readonly object _createConnectionSyncRoot = new object();
readonly Dictionary<string, MqttClientConnection> _connections = new Dictionary<string, MqttClientConnection>();
@@ -92,7 +92,7 @@ namespace MQTTnet.Server
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, _options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false);

return;
}
@@ -177,7 +177,7 @@ namespace MQTTnet.Server
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

_messageQueue.Add(new MqttEnqueuedApplicationMessage(applicationMessage, sender));
_messageQueue.Add(new MqttPendingApplicationMessage(applicationMessage, sender));
}

public Task SubscribeAsync(string clientId, ICollection<MqttTopicFilter> topicFilters)
@@ -273,7 +273,7 @@ namespace MQTTnet.Server
{
cancellationToken.ThrowIfCancellationRequested();

MqttEnqueuedApplicationMessage queuedApplicationMessage;
MqttPendingApplicationMessage queuedApplicationMessage;
try
{
queuedApplicationMessage = _messageQueue.Take(cancellationToken);


Source/MQTTnet/Server/MqttEnqueuedApplicationMessage.cs → Source/MQTTnet/Server/MqttPendingApplicationMessage.cs Переглянути файл

@@ -1,8 +1,8 @@
namespace MQTTnet.Server
{
public class MqttEnqueuedApplicationMessage
public sealed class MqttPendingApplicationMessage
{
public MqttEnqueuedApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender)
public MqttPendingApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender)
{
Sender = sender;
ApplicationMessage = applicationMessage;

+ 1
- 1
Tests/MQTTnet.AspNetCore.Tests/MqttConnectionContextTest.cs Переглянути файл

@@ -85,7 +85,7 @@ namespace MQTTnet.AspNetCore.Tests
connection.Transport = pipe;
var ctx = new MqttConnectionContext(serializer, connection);

await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
await ctx.SendPacketAsync(new MqttPublishPacket { Payload = new byte[20_000] }, CancellationToken.None).ConfigureAwait(false);

var readResult = await pipe.Send.Reader.ReadAsync();
Assert.IsTrue(readResult.Buffer.Length > 20000);


+ 1
- 1
Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs Переглянути файл

@@ -66,7 +66,7 @@ namespace MQTTnet.Benchmarks

for (var i = 0; i < 10000; i++)
{
_channelAdapter.SendPacketAsync(_packet, TimeSpan.Zero, CancellationToken.None).GetAwaiter().GetResult();
_channelAdapter.SendPacketAsync(_packet, CancellationToken.None).GetAwaiter().GetResult();
}

_stream.Position = 0;


+ 1
- 1
Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs Переглянути файл

@@ -42,7 +42,7 @@ namespace MQTTnet.Tests.Mockups
return Task.FromResult(0);
}

public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
ThrowIfPartnerIsNull();



+ 4
- 2
Tests/MQTTnet.Core.Tests/Protocol_Tests.cs Переглянути файл

@@ -11,9 +11,11 @@ namespace MQTTnet.Tests
{
for (uint i = 0; i < 268435455; i++)
{
var buffer = MqttPacketWriter.EncodeVariableLengthInteger(i);
var reader = new MqttPacketBodyReader(buffer.Array, buffer.Offset, buffer.Count);
var writer = new MqttPacketWriter();
writer.WriteVariableLengthInteger(i);
var buffer = writer.GetBuffer();

var reader = new MqttPacketBodyReader(buffer, 0, writer.Length);
var checkValue = reader.ReadVariableLengthInteger();

Assert.AreEqual(i, checkValue);


+ 11
- 11
Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs
Різницю між файлами не показано, бо вона завелика
Переглянути файл


+ 0
- 1
Tests/MQTTnet.TestApp.NetCore/Program.cs Переглянути файл

@@ -31,7 +31,6 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine("a = Start QoS 2 benchmark");
Console.WriteLine("b = Start QoS 1 benchmark");
Console.WriteLine("c = Start QoS 0 benchmark");
Console.WriteLine("c = Start QoS 0 benchmark");

var pressedKey = Console.ReadKey(true);
if (pressedKey.KeyChar == '1')


Завантаження…
Відмінити
Зберегти