From 3587a8ebd1651f53fac8a7cf3b2edb11b2085c27 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 25 Mar 2018 14:42:34 +0200 Subject: [PATCH] Add new option which allows configuring the send interval for keep alive packets. --- .../Client/IMqttClientOptions.cs | 2 + .../MQTTnet.NetStandard/Client/MqttClient.cs | 51 +++++++++++++++---- .../Client/MqttClientOptions.cs | 4 +- .../Client/MqttPacketDispatcher.cs | 24 ++++++--- 4 files changed, 63 insertions(+), 18 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs index 035dba1..2f5b504 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs @@ -13,6 +13,8 @@ namespace MQTTnet.Client TimeSpan CommunicationTimeout { get; } TimeSpan KeepAlivePeriod { get; } + TimeSpan? KeepAliveSendInterval { get; set; } + MqttProtocolVersion ProtocolVersion { get; } IMqttClientChannelOptions ChannelOptions { get; } diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index ffdc7f2..d01a47c 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -15,6 +16,7 @@ namespace MQTTnet.Client public class MqttClient : IMqttClient { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); + private readonly Stopwatch _sendTracker = new Stopwatch(); private readonly IMqttClientAdapterFactory _adapterFactory; private readonly MqttPacketDispatcher _packetDispatcher; private readonly IMqttNetLogger _logger; @@ -59,10 +61,12 @@ namespace MQTTnet.Client _logger.Trace("Connection with server established."); await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false); - var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); + var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); _logger.Trace("MQTT connection with server established."); + _sendTracker.Restart(); + if (_options.KeepAlivePeriod != TimeSpan.Zero) { StartSendingKeepAliveMessages(_cancellationTokenSource.Token); @@ -149,7 +153,7 @@ namespace MQTTnet.Client case MqttQualityOfServiceLevel.AtMostOnce: { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosGroup).ConfigureAwait(false); + await SendAsync(qosGroup.ToArray()).ConfigureAwait(false); break; } case MqttQualityOfServiceLevel.AtLeastOnce: @@ -167,8 +171,14 @@ namespace MQTTnet.Client foreach (var publishPacket in qosGroup) { publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(); + var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + var pubRelPacket = new MqttPubRelPacket + { + PacketIdentifier = pubRecPacket.PacketIdentifier + }; + + await SendAndReceiveAsync(pubRelPacket).ConfigureAwait(false); } break; @@ -325,24 +335,31 @@ namespace MQTTnet.Client private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) { - return SendAsync(pubRelPacket.CreateResponse()); + var response = new MqttPubCompPacket + { + PacketIdentifier = pubRelPacket.PacketIdentifier + }; + + return SendAsync(response); } - private Task SendAsync(MqttBasePacket packet) + private Task SendAsync(params MqttBasePacket[] packets) { - return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packet); + _sendTracker.Restart(); + return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packets); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - ushort identifier = 0; + ushort? identifier = null; if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier) { identifier = requestPacketWithIdentifier.PacketIdentifier; } var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout); - await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); + await SendAsync(requestPacket).ConfigureAwait(false); + return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); } @@ -354,8 +371,22 @@ namespace MQTTnet.Client { while (!cancellationToken.IsCancellationRequested) { - await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); - await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); + TimeSpan keepAliveSendInterval; + if (_options.KeepAliveSendInterval.HasValue) + { + keepAliveSendInterval = _options.KeepAliveSendInterval.Value; + } + else + { + keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75); + } + + if (_sendTracker.Elapsed > keepAliveSendInterval) + { + await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); + } + + await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs index 20e711f..9640a47 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs @@ -13,7 +13,9 @@ namespace MQTTnet.Client public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials(); - public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); + public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15); + + public TimeSpan? KeepAliveSendInterval { get; set; } public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs index ca75931..8b09fbc 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Client _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - public async Task WaitForPacketAsync(Type responseType, ushort identifier, TimeSpan timeout) + public async Task WaitForPacketAsync(Type responseType, ushort? identifier, TimeSpan timeout) { var packetAwaiter = AddPacketAwaiter(responseType, identifier); try @@ -44,13 +44,13 @@ namespace MQTTnet.Client if (_awaiters.TryGetValue(type, out var byId)) { - ushort identifier = 0; + ushort? identifier = 0; if (packet is IMqttPacketWithIdentifier packetWithIdentifier) { identifier = packetWithIdentifier.PacketIdentifier; } - if (byId.TryRemove(identifier, out var tcs)) + if (byId.TryRemove(identifier.Value, out var tcs)) { tcs.TrySetResult(packet); return; @@ -65,12 +65,17 @@ namespace MQTTnet.Client _awaiters.Clear(); } - private TaskCompletionSource AddPacketAwaiter(Type responseType, ushort identifier) + private TaskCompletionSource AddPacketAwaiter(Type responseType, ushort? identifier) { var tcs = new TaskCompletionSource(); + if (!identifier.HasValue) + { + identifier = 0; + } + var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - if (!byId.TryAdd(identifier, tcs)) + if (!byId.TryAdd(identifier.Value, tcs)) { throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); } @@ -78,10 +83,15 @@ namespace MQTTnet.Client return tcs; } - private void RemovePacketAwaiter(Type responseType, ushort identifier) + private void RemovePacketAwaiter(Type responseType, ushort? identifier) { + if (!identifier.HasValue) + { + identifier = 0; + } + var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary>()); - byId.TryRemove(identifier, out var _); + byId.TryRemove(identifier.Value, out var _); } } } \ No newline at end of file