From 2267770a72c4ea0bb33b74991cae8231f5a412ed Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 2 Apr 2019 21:51:26 +0200 Subject: [PATCH] Make internal message of server async. --- .../MqttConnectionContext.cs | 16 ++-- Source/MQTTnet/Adapter/IMqttChannelAdapter.cs | 10 ++- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 51 ++++++++----- .../Formatter/MqttPacketFormatterAdapter.cs | 7 +- Source/MQTTnet/Formatter/MqttPacketReader.cs | 70 ++++++------------ .../MQTTnet/Formatter/MqttProtocolVersion.cs | 2 + Source/MQTTnet/Internal/AsyncBlockingQueue.cs | 28 ++++++- .../Internal/AsyncQueueDequeueResult.cs | 15 ++++ Source/MQTTnet/Server/MqttClientConnection.cs | 38 ++++++---- .../Server/MqttClientKeepAliveMonitor.cs | 1 - ...ttClientSessionApplicationMessagesQueue.cs | 74 +++++-------------- .../Server/MqttClientSessionsManager.cs | 5 +- .../Server/MqttQueuedApplicationMessage.cs | 17 +++++ .../Server/Status/IMqttClientStatus.cs | 4 + .../MQTTnet/Server/Status/MqttClientStatus.cs | 8 +- 15 files changed, 192 insertions(+), 154 deletions(-) create mode 100644 Source/MQTTnet/Internal/AsyncQueueDequeueResult.cs create mode 100644 Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index 4a9eb91..a0ad1fe 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -20,12 +20,17 @@ namespace MQTTnet.AspNetCore } public string Endpoint => Connection.ConnectionId; - public bool IsSecureConnection => false; // TODO: Fix detection. + public bool IsSecureConnection => false; // TODO: Fix detection (WS vs. WSS). + public ConnectionContext Connection { get; } public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } - public event EventHandler ReadingPacketStarted; - public event EventHandler ReadingPacketCompleted; + public long BytesSent { get; } // TODO: Fix calculation. + public long BytesReceived { get; } // TODO: Fix calculation. + + public Action ReadingPacketStartedCallback { get; set; } + public Action ReadingPacketCompletedCallback { get; set; } + private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) @@ -34,6 +39,7 @@ namespace MQTTnet.AspNetCore { return tcp.StartAsync(); } + return Task.CompletedTask; } @@ -80,7 +86,7 @@ namespace MQTTnet.AspNetCore else { // we did receive something but the message is not yet complete - ReadingPacketStarted?.Invoke(this, EventArgs.Empty); + ReadingPacketStartedCallback?.Invoke(); } } else if (readResult.IsCompleted) @@ -99,7 +105,7 @@ namespace MQTTnet.AspNetCore } finally { - ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); + ReadingPacketCompletedCallback?.Invoke(); } cancellationToken.ThrowIfCancellationRequested(); diff --git a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs index e759221..08bc809 100644 --- a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs @@ -13,10 +13,14 @@ namespace MQTTnet.Adapter bool IsSecureConnection { get; } MqttPacketFormatterAdapter PacketFormatterAdapter { get; } - - event EventHandler ReadingPacketStarted; - event EventHandler ReadingPacketCompleted; + long BytesSent { get; } + + long BytesReceived { get; } + + Action ReadingPacketStartedCallback { get; set; } + + Action ReadingPacketCompletedCallback { get; set; } Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken); diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index eba18f7..55a3825 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -25,16 +25,19 @@ namespace MQTTnet.Adapter private readonly MqttPacketReader _packetReader; private readonly byte[] _fixedHeaderBuffer = new byte[2]; - + private bool _isDisposed; + private long _bytesReceived; + private long _bytesSent; + public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _channel = channel ?? throw new ArgumentNullException(nameof(channel)); PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter)); - + _packetReader = new MqttPacketReader(_channel); _logger = logger.CreateChildLogger(nameof(MqttChannelAdapter)); @@ -46,9 +49,12 @@ namespace MQTTnet.Adapter public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } - public event EventHandler ReadingPacketStarted; - public event EventHandler ReadingPacketCompleted; + public long BytesSent => Interlocked.Read(ref _bytesSent); + public long BytesReceived => Interlocked.Read(ref _bytesReceived); + public Action ReadingPacketStartedCallback { get; set; } + public Action ReadingPacketCompletedCallback { get; set; } + public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -62,7 +68,7 @@ namespace MQTTnet.Adapter else { await MqttTaskTimeout.WaitAsync(t => _channel.ConnectAsync(t), timeout, cancellationToken).ConfigureAwait(false); - } + } } catch (Exception exception) { @@ -101,7 +107,7 @@ namespace MQTTnet.Adapter WrapException(exception); } } - + public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) { await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); @@ -118,7 +124,9 @@ namespace MQTTnet.Adapter await MqttTaskTimeout.WaitAsync( t => _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, t), timeout, cancellationToken).ConfigureAwait(false); } - + + Interlocked.Add(ref _bytesReceived, packetData.Count); + PacketFormatterAdapter.FreeBuffer(); _logger.Verbose("TX ({0} bytes) >>> {1}", packetData.Count, packet); @@ -159,7 +167,9 @@ namespace MQTTnet.Adapter return null; } - if (!PacketFormatterAdapter.ProtocolVersion.HasValue) + Interlocked.Add(ref _bytesSent, receivedMqttPacket.TotalLength); + + if (PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.Unknown) { PacketFormatterAdapter.DetectProtocolVersion(receivedMqttPacket); } @@ -190,10 +200,21 @@ namespace MQTTnet.Adapter return null; } + public void ResetStatistics() + { + Interlocked.Exchange(ref _bytesReceived, 0L); + Interlocked.Exchange(ref _bytesSent, 0L); + } + private async Task ReceiveAsync(CancellationToken cancellationToken) { var readFixedHeaderResult = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false); + if (cancellationToken.IsCancellationRequested) + { + return null; + } + try { if (readFixedHeaderResult.ConnectionClosed) @@ -201,7 +222,7 @@ namespace MQTTnet.Adapter return null; } - ReadingPacketStarted?.Invoke(this, EventArgs.Empty); + ReadingPacketStartedCallback?.Invoke(); var fixedHeader = readFixedHeaderResult.FixedHeader; if (fixedHeader.RemainingLength == 0) @@ -221,13 +242,7 @@ namespace MQTTnet.Adapter chunkSize = bytesLeft; } -#if WINDOWS_UWP - var readBytes = await _channel.ReadAsync(body, bodyOffset, (int)chunkSize, cancellationToken).ConfigureAwait(false); -#else - // async/await is not used to avoid the overhead of context switches. We assume that the remaining data - // has been sent from the sender directly after the initial bytes. - var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); -#endif + var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { @@ -238,7 +253,7 @@ namespace MQTTnet.Adapter { return null; } - + bodyOffset += readBytes; } while (bodyOffset < body.Length); @@ -247,7 +262,7 @@ namespace MQTTnet.Adapter } finally { - ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); + ReadingPacketCompletedCallback?.Invoke(); } } diff --git a/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs b/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs index 104318e..745c1a3 100644 --- a/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs +++ b/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs @@ -20,7 +20,7 @@ namespace MQTTnet.Formatter UseProtocolVersion(protocolVersion); } - public MqttProtocolVersion? ProtocolVersion { get; private set; } + public MqttProtocolVersion ProtocolVersion { get; private set; } public IMqttDataConverter DataConverter { @@ -64,6 +64,11 @@ namespace MQTTnet.Formatter private void UseProtocolVersion(MqttProtocolVersion protocolVersion) { + if (protocolVersion == MqttProtocolVersion.Unknown) + { + throw new InvalidOperationException("MQTT protocol version is invalid."); + } + ProtocolVersion = protocolVersion; switch (protocolVersion) diff --git a/Source/MQTTnet/Formatter/MqttPacketReader.cs b/Source/MQTTnet/Formatter/MqttPacketReader.cs index b4e7b67..2589c5b 100644 --- a/Source/MQTTnet/Formatter/MqttPacketReader.cs +++ b/Source/MQTTnet/Formatter/MqttPacketReader.cs @@ -3,7 +3,6 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Exceptions; -using MQTTnet.Internal; namespace MQTTnet.Formatter { @@ -31,8 +30,11 @@ namespace MQTTnet.Formatter { var bytesRead = await _channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); - + if (cancellationToken.IsCancellationRequested) + { + return null; + } + if (bytesRead == 0) { return new ReadFixedHeaderResult @@ -100,13 +102,19 @@ namespace MQTTnet.Formatter return null; } - var buffer = ReadByte(cancellationToken); - if (!buffer.HasValue) + var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); + + if (cancellationToken.IsCancellationRequested) { return null; } - encodedByte = buffer.Value; + if (readCount == 0) + { + return null; + } + + encodedByte = _singleByteBuffer[0]; value += (encodedByte & 127) * multiplier; multiplier *= 128; @@ -114,26 +122,8 @@ namespace MQTTnet.Formatter return value; } - - private byte? ReadByte(CancellationToken cancellationToken) - { - var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); - - if (cancellationToken.IsCancellationRequested) - { - return null; - } - - if (readCount == 0) - { - return null; - } - - return _singleByteBuffer[0]; - } - #else - + private async Task ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) { var offset = 0; @@ -154,13 +144,19 @@ namespace MQTTnet.Formatter return null; } - var buffer = await ReadByteAsync(cancellationToken).ConfigureAwait(false); - if (!buffer.HasValue) + var readCount = await _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false); + + if (cancellationToken.IsCancellationRequested) + { + return null; + } + + if (readCount == 0) { return null; } - encodedByte = buffer.Value; + encodedByte = _singleByteBuffer[0]; value += (encodedByte & 127) * multiplier; multiplier *= 128; @@ -168,24 +164,6 @@ namespace MQTTnet.Formatter return value; } - - private async Task ReadByteAsync(CancellationToken cancellationToken) - { - var readCount = await _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false); - - if (cancellationToken.IsCancellationRequested) - { - return null; - } - - if (readCount == 0) - { - return null; - } - - return _singleByteBuffer[0]; - } - #endif } } diff --git a/Source/MQTTnet/Formatter/MqttProtocolVersion.cs b/Source/MQTTnet/Formatter/MqttProtocolVersion.cs index c5e05e5..2121e7e 100644 --- a/Source/MQTTnet/Formatter/MqttProtocolVersion.cs +++ b/Source/MQTTnet/Formatter/MqttProtocolVersion.cs @@ -2,6 +2,8 @@ { public enum MqttProtocolVersion { + Unknown = 0, + V310 = 3, V311 = 4, V500 = 5 diff --git a/Source/MQTTnet/Internal/AsyncBlockingQueue.cs b/Source/MQTTnet/Internal/AsyncBlockingQueue.cs index 678be11..6cb80d2 100644 --- a/Source/MQTTnet/Internal/AsyncBlockingQueue.cs +++ b/Source/MQTTnet/Internal/AsyncBlockingQueue.cs @@ -8,7 +8,10 @@ namespace MQTTnet.Internal public sealed class AsyncQueue : IDisposable { private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0); - private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + + private ConcurrentQueue _queue = new ConcurrentQueue(); + + public int Count => _queue.Count; public void Enqueue(TItem item) { @@ -16,9 +19,9 @@ namespace MQTTnet.Internal _semaphore.Release(); } - public async Task DequeueAsync(CancellationToken cancellationToken) + public async Task> TryDequeueAsync(CancellationToken cancellationToken) { - while (true) + while (!cancellationToken.IsCancellationRequested) { await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); @@ -26,9 +29,26 @@ namespace MQTTnet.Internal if (_queue.TryDequeue(out var item)) { - return item; + return new AsyncQueueDequeueResult(true, item); } } + + return new AsyncQueueDequeueResult(false, default(TItem)); + } + + public AsyncQueueDequeueResult TryDequeue() + { + if (_queue.TryDequeue(out var item)) + { + return new AsyncQueueDequeueResult(true, item); + } + + return new AsyncQueueDequeueResult(false, default(TItem)); + } + + public void Clear() + { + Interlocked.Exchange(ref _queue, new ConcurrentQueue()); } public void Dispose() diff --git a/Source/MQTTnet/Internal/AsyncQueueDequeueResult.cs b/Source/MQTTnet/Internal/AsyncQueueDequeueResult.cs new file mode 100644 index 0000000..3e2b07b --- /dev/null +++ b/Source/MQTTnet/Internal/AsyncQueueDequeueResult.cs @@ -0,0 +1,15 @@ +namespace MQTTnet.Internal +{ + public class AsyncQueueDequeueResult + { + public AsyncQueueDequeueResult(bool isSuccess, TItem item) + { + IsSuccess = isSuccess; + Item = item; + } + + public bool IsSuccess { get; } + + public TItem Item { get; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 05a10a5..7ce7adf 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -34,8 +34,10 @@ namespace MQTTnet.Server private readonly MqttConnectPacket _connectPacket; private DateTime _lastPacketReceivedTimestamp; + private DateTime _lastNonKeepAlivePacketReceivedTimestamp; + private long _receivedPacketsCount; - private long _sentPacketsCount; + private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. private long _receivedApplicationMessagesCount; private long _sentApplicationMessagesCount; @@ -64,6 +66,7 @@ namespace MQTTnet.Server _keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger); _lastPacketReceivedTimestamp = DateTime.UtcNow; + _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp; } public string ClientId => _connectPacket.ClientId; @@ -85,7 +88,7 @@ namespace MQTTnet.Server { status.ClientId = ClientId; status.Endpoint = _endpoint; - status.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion.Value; + status.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion; status.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount); status.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount); @@ -94,8 +97,10 @@ namespace MQTTnet.Server status.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount); status.LastPacketReceivedTimestamp = _lastPacketReceivedTimestamp; + status.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp; - //status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; + status.BytesSent = _channelAdapter.BytesSent; + status.BytesReceived = _channelAdapter.BytesReceived; } //public void ClearPendingApplicationMessages() @@ -131,10 +136,9 @@ namespace MQTTnet.Server try { _logger.Info("Client '{0}': Session started.", ClientId); - //_eventDispatcher.OnClientConnected(ClientId); - - _channelAdapter.ReadingPacketStarted += OnAdapterReadingPacketStarted; - _channelAdapter.ReadingPacketCompleted += OnAdapterReadingPacketCompleted; + + _channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted; + _channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted; Session.WillMessage = _connectPacket.WillMessage; @@ -166,7 +170,12 @@ namespace MQTTnet.Server Interlocked.Increment(ref _sentPacketsCount); _lastPacketReceivedTimestamp = DateTime.UtcNow; - + + if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket)) + { + _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp; + } + _keepAliveMonitor.PacketReceived(); if (packet is MqttPublishPacket publishPacket) @@ -243,12 +252,11 @@ namespace MQTTnet.Server _packetDispatcher.Reset(); - _channelAdapter.ReadingPacketStarted -= OnAdapterReadingPacketStarted; - _channelAdapter.ReadingPacketCompleted -= OnAdapterReadingPacketCompleted; + _channelAdapter.ReadingPacketStartedCallback = null; + _channelAdapter.ReadingPacketCompletedCallback = null; _logger.Info("Client '{0}': Session stopped.", ClientId); - //_eventDispatcher.OnClientDisconnected(ClientId); - + _packageReceiverTask = null; } @@ -376,7 +384,7 @@ namespace MQTTnet.Server private async Task SendPendingPacketsAsync(CancellationToken cancellationToken) { - MqttPendingApplicationMessage queuedApplicationMessage = null; + MqttQueuedApplicationMessage queuedApplicationMessage = null; MqttPublishPacket publishPacket = null; try @@ -501,12 +509,12 @@ namespace MQTTnet.Server } } - private void OnAdapterReadingPacketCompleted(object sender, EventArgs e) + private void OnAdapterReadingPacketCompleted() { _keepAliveMonitor?.Resume(); } - private void OnAdapterReadingPacketStarted(object sender, EventArgs e) + private void OnAdapterReadingPacketStarted() { _keepAliveMonitor?.Pause(); } diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index 3f89d19..0ca41ab 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -3,7 +3,6 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MQTTnet.Diagnostics; -using MQTTnet.Packets; namespace MQTTnet.Server { diff --git a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs index 7fbd9e2..901ac75 100644 --- a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs +++ b/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs @@ -1,54 +1,29 @@ using MQTTnet.Internal; using MQTTnet.Protocol; using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace MQTTnet.Server { - public class MqttPendingApplicationMessage - { - public MqttApplicationMessage ApplicationMessage { get; set; } - - public string SenderClientId { get; set; } - - public bool IsRetainedMessage { get; set; } - - public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } - - public bool IsDuplicate { get; set; } - } - public class MqttClientSessionApplicationMessagesQueue : IDisposable { - private readonly Queue _messageQueue = new Queue(); - private readonly AsyncAutoResetEvent _messageQueueLock = new AsyncAutoResetEvent(); - + private readonly AsyncQueue _messageQueue = new AsyncQueue(); + private readonly IMqttServerOptions _options; public MqttClientSessionApplicationMessagesQueue(IMqttServerOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); - } - public int Count - { - get - { - lock (_messageQueue) - { - return _messageQueue.Count; - } - } - } + public int Count => _messageQueue.Count; public void Enqueue(MqttApplicationMessage applicationMessage, string senderClientId, MqttQualityOfServiceLevel qualityOfServiceLevel, bool isRetainedMessage) { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - Enqueue(new MqttPendingApplicationMessage + Enqueue(new MqttQueuedApplicationMessage { ApplicationMessage = applicationMessage, SenderClientId = senderClientId, @@ -59,39 +34,23 @@ namespace MQTTnet.Server public void Clear() { - lock (_messageQueue) - { - _messageQueue.Clear(); - } - } - - public void Dispose() - { + _messageQueue.Clear(); } - public async Task TakeAsync(CancellationToken cancellationToken) + public async Task TakeAsync(CancellationToken cancellationToken) { - // TODO: Create a blocking queue from this. - - while (!cancellationToken.IsCancellationRequested) + var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); + if (!dequeueResult.IsSuccess) { - lock (_messageQueue) - { - if (_messageQueue.Count > 0) - { - return _messageQueue.Dequeue(); - } - } - - await _messageQueueLock.WaitOneAsync(cancellationToken).ConfigureAwait(false); + return null; } - return null; + return dequeueResult.Item; } - public void Enqueue(MqttPendingApplicationMessage enqueuedApplicationMessage) + public void Enqueue(MqttQueuedApplicationMessage queuedApplicationMessage) { - if (enqueuedApplicationMessage == null) throw new ArgumentNullException(nameof(enqueuedApplicationMessage)); + if (queuedApplicationMessage == null) throw new ArgumentNullException(nameof(queuedApplicationMessage)); lock (_messageQueue) { @@ -104,14 +63,17 @@ namespace MQTTnet.Server if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) { - _messageQueue.Dequeue(); + _messageQueue.TryDequeue(); } } - _messageQueue.Enqueue(enqueuedApplicationMessage); + _messageQueue.Enqueue(queuedApplicationMessage); } + } - _messageQueueLock.Set(); + public void Dispose() + { + _messageQueue.Dispose(); } } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index a88d315..72fca9b 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -68,7 +68,7 @@ namespace MQTTnet.Server foreach (var connection in _connections.Values) { - var clientStatus = new MqttClientStatus(connection, this); + var clientStatus = new MqttClientStatus(connection); connection.FillStatus(clientStatus); var sessionStatus = new MqttSessionStatus(connection.Session, this); @@ -175,7 +175,8 @@ namespace MQTTnet.Server return; } - var queuedApplicationMessage = await _messageQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); + var queuedApplicationMessage = dequeueResult.Item; var sender = queuedApplicationMessage.Sender; var applicationMessage = queuedApplicationMessage.ApplicationMessage; diff --git a/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs b/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs new file mode 100644 index 0000000..5a3300a --- /dev/null +++ b/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs @@ -0,0 +1,17 @@ +using MQTTnet.Protocol; + +namespace MQTTnet.Server +{ + public class MqttQueuedApplicationMessage + { + public MqttApplicationMessage ApplicationMessage { get; set; } + + public string SenderClientId { get; set; } + + public bool IsRetainedMessage { get; set; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } + + public bool IsDuplicate { get; set; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs index b751f29..714b00d 100644 --- a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs @@ -26,6 +26,10 @@ namespace MQTTnet.Server.Status IMqttSessionStatus Session { get; } + long BytesSent { get; } + + long BytesReceived { get; } + Task DisconnectAsync(); } } diff --git a/Source/MQTTnet/Server/Status/MqttClientStatus.cs b/Source/MQTTnet/Server/Status/MqttClientStatus.cs index 79daa46..cc6e77d 100644 --- a/Source/MQTTnet/Server/Status/MqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttClientStatus.cs @@ -6,13 +6,11 @@ namespace MQTTnet.Server.Status { public class MqttClientStatus : IMqttClientStatus { - private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttClientConnection _connection; - public MqttClientStatus(MqttClientConnection connection, MqttClientSessionsManager sessionsManager) + public MqttClientStatus(MqttClientConnection connection) { _connection = connection; - _sessionsManager = sessionsManager; } public string ClientId { get; set; } @@ -35,6 +33,10 @@ namespace MQTTnet.Server.Status public IMqttSessionStatus Session { get; set; } + public long BytesSent { get; set; } + + public long BytesReceived { get; set; } + public Task DisconnectAsync() { return _connection.StopAsync();